From 06eeb35366f5fb43b4f59fd09723f2fd1586dcdf Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 21 Nov 2023 23:57:42 +0000 Subject: [PATCH] feat(bin): improve status logs (#5518) --- bin/reth/src/chain/import.rs | 4 +- bin/reth/src/debug_cmd/execution.rs | 4 +- bin/reth/src/node/events.rs | 215 ++++++++++++--------- bin/reth/src/node/mod.rs | 2 +- crates/primitives/src/stage/checkpoints.rs | 18 +- crates/stages/src/pipeline/event.rs | 11 +- crates/stages/src/pipeline/mod.rs | 90 ++++----- 7 files changed, 194 insertions(+), 150 deletions(-) diff --git a/bin/reth/src/chain/import.rs b/bin/reth/src/chain/import.rs index 7478aad47f..2d1800fc36 100644 --- a/bin/reth/src/chain/import.rs +++ b/bin/reth/src/chain/import.rs @@ -109,12 +109,12 @@ impl ImportCommand { pipeline.set_tip(tip); debug!(target: "reth::cli", ?tip, "Tip manually set"); - let factory = ProviderFactory::new(&db, self.chain.clone()); + let factory = ProviderFactory::new(db.clone(), self.chain.clone()); let provider = factory.provider()?; let latest_block_number = provider.get_stage_checkpoint(StageId::Finish)?.map(|ch| ch.block_number); - tokio::spawn(handle_events(None, latest_block_number, events)); + tokio::spawn(handle_events(None, latest_block_number, events, db.clone())); // Run pipeline info!(target: "reth::cli", "Starting sync pipeline"); diff --git a/bin/reth/src/debug_cmd/execution.rs b/bin/reth/src/debug_cmd/execution.rs index c248819a0e..8a91ca73f0 100644 --- a/bin/reth/src/debug_cmd/execution.rs +++ b/bin/reth/src/debug_cmd/execution.rs @@ -235,7 +235,7 @@ impl Command { &ctx.task_executor, )?; - let factory = ProviderFactory::new(&db, self.chain.clone()); + let factory = ProviderFactory::new(db.clone(), self.chain.clone()); let provider = factory.provider()?; let latest_block_number = @@ -252,7 +252,7 @@ impl Command { ); ctx.task_executor.spawn_critical( "events task", - events::handle_events(Some(network.clone()), latest_block_number, events), + events::handle_events(Some(network.clone()), latest_block_number, events, db.clone()), ); let mut current_max_block = latest_block_number.unwrap_or_default(); diff --git a/bin/reth/src/node/events.rs b/bin/reth/src/node/events.rs index 8b5d7c76ad..0fb17f43b3 100644 --- a/bin/reth/src/node/events.rs +++ b/bin/reth/src/node/events.rs @@ -3,6 +3,7 @@ use crate::node::cl_events::ConsensusLayerHealthEvent; use futures::Stream; use reth_beacon_consensus::BeaconConsensusEngineEvent; +use reth_db::DatabaseEnv; use reth_interfaces::consensus::ForkchoiceState; use reth_network::{NetworkEvent, NetworkHandle}; use reth_network_api::PeersInfo; @@ -13,8 +14,10 @@ use reth_primitives::{ use reth_prune::PrunerEvent; use reth_stages::{ExecOutput, PipelineEvent}; use std::{ + fmt::{Display, Formatter}, future::Future, pin::Pin, + sync::Arc, task::{Context, Poll}, time::{Duration, Instant}, }; @@ -26,27 +29,25 @@ const INFO_MESSAGE_INTERVAL: Duration = Duration::from_secs(25); /// The current high-level state of the node. struct NodeState { + /// Database environment. + /// Used for freelist calculation reported in the "Status" log message. + /// See [EventHandler::poll]. + db: Arc, /// Connection to the network. network: Option, /// The stage currently being executed. - current_stage: Option, - /// The ETA for the current stage. - eta: Eta, - /// The current checkpoint of the executing stage. - current_checkpoint: StageCheckpoint, + current_stage: Option, /// The latest block reached by either pipeline or consensus engine. latest_block: Option, } impl NodeState { - fn new(network: Option, latest_block: Option) -> Self { - Self { - network, - current_stage: None, - eta: Eta::default(), - current_checkpoint: StageCheckpoint::new(0), - latest_block, - } + fn new( + db: Arc, + network: Option, + latest_block: Option, + ) -> Self { + Self { db, network, current_stage: None, latest_block } } fn num_connected_peers(&self) -> usize { @@ -56,70 +57,80 @@ impl NodeState { /// Processes an event emitted by the pipeline fn handle_pipeline_event(&mut self, event: PipelineEvent) { match event { - PipelineEvent::Running { pipeline_stages_progress, stage_id, checkpoint } => { - let notable = self.current_stage.is_none(); - self.current_stage = Some(stage_id); - self.current_checkpoint = checkpoint.unwrap_or_default(); + PipelineEvent::Run { pipeline_stages_progress, stage_id, checkpoint, target } => { + let checkpoint = checkpoint.unwrap_or_default(); + let current_stage = CurrentStage { + stage_id, + eta: match &self.current_stage { + Some(current_stage) if current_stage.stage_id == stage_id => { + current_stage.eta + } + _ => Eta::default(), + }, + checkpoint, + target, + }; - if notable { - if let Some(progress) = self.current_checkpoint.entities() { - info!( - pipeline_stages = %pipeline_stages_progress, - stage = %stage_id, - from = self.current_checkpoint.block_number, - checkpoint = %self.current_checkpoint.block_number, - %progress, - eta = %self.eta.fmt_for_stage(stage_id), - "Executing stage", - ); - } else { - info!( - pipeline_stages = %pipeline_stages_progress, - stage = %stage_id, - from = self.current_checkpoint.block_number, - checkpoint = %self.current_checkpoint.block_number, - eta = %self.eta.fmt_for_stage(stage_id), - "Executing stage", - ); - } - } + let progress = OptionalField( + checkpoint.entities().and_then(|entities| entities.fmt_percentage()), + ); + let eta = current_stage.eta.fmt_for_stage(stage_id); + + info!( + pipeline_stages = %pipeline_stages_progress, + stage = %stage_id, + checkpoint = %checkpoint.block_number, + target = %OptionalField(target), + %progress, + %eta, + "Executing stage", + ); + + self.current_stage = Some(current_stage); } PipelineEvent::Ran { pipeline_stages_progress, stage_id, result: ExecOutput { checkpoint, done }, } => { - self.current_checkpoint = checkpoint; if stage_id.is_finish() { self.latest_block = Some(checkpoint.block_number); } - self.eta.update(self.current_checkpoint); - let message = - if done { "Stage finished executing" } else { "Stage committed progress" }; + if let Some(current_stage) = self.current_stage.as_mut() { + current_stage.checkpoint = checkpoint; + current_stage.eta.update(checkpoint); - if let Some(progress) = checkpoint.entities() { - info!( - pipeline_stages = %pipeline_stages_progress, - stage = %stage_id, - checkpoint = %checkpoint.block_number, - %progress, - eta = %self.eta.fmt_for_stage(stage_id), - "{message}", - ); - } else { - info!( - pipeline_stages = %pipeline_stages_progress, - stage = %stage_id, - checkpoint = %checkpoint.block_number, - eta = %self.eta.fmt_for_stage(stage_id), - "{message}", + let target = OptionalField(current_stage.target); + let progress = OptionalField( + checkpoint.entities().and_then(|entities| entities.fmt_percentage()), ); + + if done { + info!( + pipeline_stages = %pipeline_stages_progress, + stage = %stage_id, + checkpoint = %checkpoint.block_number, + %target, + %progress, + "Stage finished executing", + ) + } else { + let eta = current_stage.eta.fmt_for_stage(stage_id); + info!( + pipeline_stages = %pipeline_stages_progress, + stage = %stage_id, + checkpoint = %checkpoint.block_number, + %target, + %progress, + %eta, + "Stage committed progress", + ) + } } if done { self.current_stage = None; - self.eta = Eta::default(); } } _ => (), @@ -189,6 +200,29 @@ impl NodeState { } } +/// Helper type for formatting of optional fields: +/// - If [Some(x)], then `x` is written +/// - If [None], then `None` is written +struct OptionalField(Option); + +impl Display for OptionalField { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + if let Some(field) = &self.0 { + write!(f, "{field}") + } else { + write!(f, "None") + } + } +} + +/// The stage currently being executed. +struct CurrentStage { + stage_id: StageId, + eta: Eta, + checkpoint: StageCheckpoint, + target: Option, +} + /// A node event. #[derive(Debug)] pub enum NodeEvent { @@ -240,10 +274,11 @@ pub async fn handle_events( network: Option, latest_block_number: Option, events: E, + db: Arc, ) where E: Stream + Unpin, { - let state = NodeState::new(network, latest_block_number); + let state = NodeState::new(db, network, latest_block_number); let start = tokio::time::Instant::now() + Duration::from_secs(3); let mut info_interval = tokio::time::interval_at(start, INFO_MESSAGE_INTERVAL); @@ -273,32 +308,40 @@ where let mut this = self.project(); while this.info_interval.poll_tick(cx).is_ready() { - if let Some(stage) = this.state.current_stage { - if let Some(progress) = this.state.current_checkpoint.entities() { - info!( - target: "reth::cli", - connected_peers = this.state.num_connected_peers(), - %stage, - checkpoint = %this.state.current_checkpoint.block_number, - %progress, - eta = %this.state.eta.fmt_for_stage(stage), - "Status" - ); - } else { - info!( - target: "reth::cli", - connected_peers = this.state.num_connected_peers(), - %stage, - checkpoint = %this.state.current_checkpoint.block_number, - eta = %this.state.eta.fmt_for_stage(stage), - "Status" - ); - } + let freelist = OptionalField(this.state.db.freelist().ok()); + + if let Some(CurrentStage { stage_id, eta, checkpoint, target }) = + &this.state.current_stage + { + let progress = OptionalField( + checkpoint.entities().and_then(|entities| entities.fmt_percentage()), + ); + let eta = eta.fmt_for_stage(*stage_id); + + info!( + target: "reth::cli", + connected_peers = this.state.num_connected_peers(), + %freelist, + stage = %stage_id, + checkpoint = checkpoint.block_number, + target = %OptionalField(*target), + %progress, + %eta, + "Status" + ); + } else if let Some(latest_block) = this.state.latest_block { + info!( + target: "reth::cli", + connected_peers = this.state.num_connected_peers(), + %freelist, + %latest_block, + "Status" + ); } else { info!( target: "reth::cli", connected_peers = this.state.num_connected_peers(), - latest_block = this.state.latest_block.unwrap_or(this.state.current_checkpoint.block_number), + %freelist, "Status" ); } @@ -332,7 +375,7 @@ where /// checkpoints reported by the pipeline. /// /// One `Eta` is only valid for a single stage. -#[derive(Default)] +#[derive(Default, Copy, Clone)] struct Eta { /// The last stage checkpoint last_checkpoint: EntitiesCheckpoint, @@ -375,8 +418,8 @@ impl Eta { } } -impl std::fmt::Display for Eta { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl Display for Eta { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { if let Some((eta, last_checkpoint_time)) = self.eta.zip(self.last_checkpoint_time) { let remaining = eta.checked_sub(last_checkpoint_time.elapsed()); diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 3019d0cd21..316168f4eb 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -516,7 +516,7 @@ impl NodeCommand { ); ctx.task_executor.spawn_critical( "events task", - events::handle_events(Some(network.clone()), Some(head.number), events), + events::handle_events(Some(network.clone()), Some(head.number), events, db.clone()), ); let engine_api = EngineApi::new( diff --git a/crates/primitives/src/stage/checkpoints.rs b/crates/primitives/src/stage/checkpoints.rs index 0304f0f727..b238c77e19 100644 --- a/crates/primitives/src/stage/checkpoints.rs +++ b/crates/primitives/src/stage/checkpoints.rs @@ -5,10 +5,7 @@ use crate::{ use bytes::{Buf, BufMut}; use reth_codecs::{derive_arbitrary, main_codec, Compact}; use serde::{Deserialize, Serialize}; -use std::{ - fmt::{Display, Formatter}, - ops::RangeInclusive, -}; +use std::ops::RangeInclusive; /// Saves the progress of Merkle stage. #[derive(Default, Debug, Clone, PartialEq)] @@ -169,9 +166,16 @@ pub struct EntitiesCheckpoint { pub total: u64, } -impl Display for EntitiesCheckpoint { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{:.2}%", 100.0 * self.processed as f64 / self.total as f64) +impl EntitiesCheckpoint { + /// Formats entities checkpoint as percentage, i.e. `processed / total`. + /// + /// Return [None] if `total == 0`. + pub fn fmt_percentage(&self) -> Option { + if self.total == 0 { + return None + } + + Some(format!("{:.2}%", 100.0 * self.processed as f64 / self.total as f64)) } } diff --git a/crates/stages/src/pipeline/event.rs b/crates/stages/src/pipeline/event.rs index 05d7945d33..d5b02610a5 100644 --- a/crates/stages/src/pipeline/event.rs +++ b/crates/stages/src/pipeline/event.rs @@ -1,5 +1,8 @@ use crate::stage::{ExecOutput, UnwindInput, UnwindOutput}; -use reth_primitives::stage::{StageCheckpoint, StageId}; +use reth_primitives::{ + stage::{StageCheckpoint, StageId}, + BlockNumber, +}; use std::fmt::{Display, Formatter}; /// An event emitted by a [Pipeline][crate::Pipeline]. @@ -12,13 +15,15 @@ use std::fmt::{Display, Formatter}; #[derive(Debug, PartialEq, Eq, Clone)] pub enum PipelineEvent { /// Emitted when a stage is about to be run. - Running { + Run { /// Pipeline stages progress. pipeline_stages_progress: PipelineStagesProgress, /// The stage that is about to be run. stage_id: StageId, /// The previous checkpoint of the stage. checkpoint: Option, + /// The block number up to which the stage is running, if known. + target: Option, }, /// Emitted when a stage has run a single time. Ran { @@ -30,7 +35,7 @@ pub enum PipelineEvent { result: ExecOutput, }, /// Emitted when a stage is about to be unwound. - Unwinding { + Unwind { /// The stage that is about to be unwound. stage_id: StageId, /// The unwind parameters. diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 06f487858d..a48c5b462a 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -290,7 +290,7 @@ where ); while checkpoint.block_number > to { let input = UnwindInput { checkpoint, unwind_to: to, bad_block }; - self.listeners.notify(PipelineEvent::Unwinding { stage_id, input }); + self.listeners.notify(PipelineEvent::Unwind { stage_id, input }); let output = stage.unwind(&provider_rw, input); match output { @@ -378,13 +378,14 @@ where }; } - self.listeners.notify(PipelineEvent::Running { + self.listeners.notify(PipelineEvent::Run { pipeline_stages_progress: event::PipelineStagesProgress { current: stage_index + 1, total: total_stages, }, stage_id, checkpoint: prev_checkpoint, + target, }); let provider_rw = factory.provider_rw()?; @@ -393,26 +394,6 @@ where made_progress |= checkpoint.block_number != prev_checkpoint.unwrap_or_default().block_number; - if let Some(progress) = checkpoint.entities() { - debug!( - target: "sync::pipeline", - stage = %stage_id, - checkpoint = checkpoint.block_number, - ?target, - %progress, - %done, - "Stage committed progress" - ); - } else { - debug!( - target: "sync::pipeline", - stage = %stage_id, - checkpoint = checkpoint.block_number, - ?target, - %done, - "Stage committed progress" - ); - } if let Some(metrics_tx) = &mut self.metrics_tx { let _ = metrics_tx.send(MetricEvent::StageCheckpoint { stage_id, @@ -608,20 +589,22 @@ mod tests { assert_eq!( events.collect::>().await, vec![ - PipelineEvent::Running { + PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), - checkpoint: None + checkpoint: None, + target: Some(10), }, PipelineEvent::Ran { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }, }, - PipelineEvent::Running { + PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), - checkpoint: None + checkpoint: None, + target: Some(10), }, PipelineEvent::Ran { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, @@ -671,30 +654,33 @@ mod tests { events.collect::>().await, vec![ // Executing - PipelineEvent::Running { + PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 }, stage_id: StageId::Other("A"), - checkpoint: None + checkpoint: None, + target: Some(10), }, PipelineEvent::Ran { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 }, stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }, }, - PipelineEvent::Running { + PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 }, stage_id: StageId::Other("B"), - checkpoint: None + checkpoint: None, + target: Some(10), }, PipelineEvent::Ran { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 }, stage_id: StageId::Other("B"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, - PipelineEvent::Running { + PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 }, stage_id: StageId::Other("C"), - checkpoint: None + checkpoint: None, + target: Some(10), }, PipelineEvent::Ran { pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 }, @@ -702,7 +688,7 @@ mod tests { result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }, }, // Unwinding - PipelineEvent::Unwinding { + PipelineEvent::Unwind { stage_id: StageId::Other("C"), input: UnwindInput { checkpoint: StageCheckpoint::new(20), @@ -714,7 +700,7 @@ mod tests { stage_id: StageId::Other("C"), result: UnwindOutput { checkpoint: StageCheckpoint::new(1) }, }, - PipelineEvent::Unwinding { + PipelineEvent::Unwind { stage_id: StageId::Other("B"), input: UnwindInput { checkpoint: StageCheckpoint::new(10), @@ -726,7 +712,7 @@ mod tests { stage_id: StageId::Other("B"), result: UnwindOutput { checkpoint: StageCheckpoint::new(1) }, }, - PipelineEvent::Unwinding { + PipelineEvent::Unwind { stage_id: StageId::Other("A"), input: UnwindInput { checkpoint: StageCheckpoint::new(100), @@ -775,20 +761,22 @@ mod tests { events.collect::>().await, vec![ // Executing - PipelineEvent::Running { + PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), - checkpoint: None + checkpoint: None, + target: Some(10), }, PipelineEvent::Ran { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }, }, - PipelineEvent::Running { + PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), - checkpoint: None + checkpoint: None, + target: Some(10), }, PipelineEvent::Ran { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, @@ -798,7 +786,7 @@ mod tests { // Unwinding // Nothing to unwind in stage "B" PipelineEvent::Skipped { stage_id: StageId::Other("B") }, - PipelineEvent::Unwinding { + PipelineEvent::Unwind { stage_id: StageId::Other("A"), input: UnwindInput { checkpoint: StageCheckpoint::new(100), @@ -865,23 +853,25 @@ mod tests { assert_eq!( events.collect::>().await, vec![ - PipelineEvent::Running { + PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), - checkpoint: None + checkpoint: None, + target: Some(10), }, PipelineEvent::Ran { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, - PipelineEvent::Running { + PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), - checkpoint: None + checkpoint: None, + target: Some(10), }, PipelineEvent::Error { stage_id: StageId::Other("B") }, - PipelineEvent::Unwinding { + PipelineEvent::Unwind { stage_id: StageId::Other("A"), input: UnwindInput { checkpoint: StageCheckpoint::new(10), @@ -893,20 +883,22 @@ mod tests { stage_id: StageId::Other("A"), result: UnwindOutput { checkpoint: StageCheckpoint::new(0) }, }, - PipelineEvent::Running { + PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), - checkpoint: Some(StageCheckpoint::new(0)) + checkpoint: Some(StageCheckpoint::new(0)), + target: Some(10), }, PipelineEvent::Ran { pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 }, stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, - PipelineEvent::Running { + PipelineEvent::Run { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 }, stage_id: StageId::Other("B"), - checkpoint: None + checkpoint: None, + target: Some(10), }, PipelineEvent::Ran { pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },