diff --git a/Cargo.lock b/Cargo.lock index eca3268598..93fdf91ddb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4925,6 +4925,7 @@ dependencies = [ "futures", "hex", "human_bytes", + "humantime", "hyper", "jemallocator", "metrics-exporter-prometheus", diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index 3f96e825b9..ddaa59a9a9 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -83,6 +83,7 @@ backon = "0.4" hex = "0.4" thiserror = { workspace = true } pretty_assertions = "1.3.0" +humantime = "2.1.0" [features] jemalloc = ["dep:jemallocator"] diff --git a/bin/reth/src/node/events.rs b/bin/reth/src/node/events.rs index 04b36b59cc..9ab5a326a6 100644 --- a/bin/reth/src/node/events.rs +++ b/bin/reth/src/node/events.rs @@ -7,7 +7,7 @@ use reth_interfaces::consensus::ForkchoiceState; use reth_network::{NetworkEvent, NetworkHandle}; use reth_network_api::PeersInfo; use reth_primitives::{ - stage::{StageCheckpoint, StageId}, + stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, BlockNumber, }; use reth_stages::{ExecOutput, PipelineEvent}; @@ -15,10 +15,10 @@ use std::{ future::Future, pin::Pin, task::{Context, Poll}, - time::Duration, + time::{Duration, Instant}, }; use tokio::time::Interval; -use tracing::{debug, info, warn}; +use tracing::{info, warn}; /// Interval of reporting node state. const INFO_MESSAGE_INTERVAL: Duration = Duration::from_secs(30); @@ -29,6 +29,8 @@ struct NodeState { network: Option, /// The stage currently being executed. current_stage: Option, + /// The ETA for the current stage. + eta: Eta, /// The current checkpoint of the executing stage. current_checkpoint: StageCheckpoint, /// The latest canonical block added in the consensus engine. @@ -40,6 +42,7 @@ impl NodeState { Self { network, current_stage: None, + eta: Eta::default(), current_checkpoint: StageCheckpoint::new(0), latest_canonical_engine_block: None, } @@ -59,11 +62,11 @@ impl NodeState { if notable { info!( - target: "reth::cli", pipeline_stages = %format!("{pipeline_position}/{pipeline_total}"), stage = %stage_id, from = self.current_checkpoint.block_number, checkpoint = %self.current_checkpoint, + eta = %self.eta, "Executing stage", ); } @@ -75,17 +78,14 @@ impl NodeState { result: ExecOutput { checkpoint, done }, } => { self.current_checkpoint = checkpoint; - - if done { - self.current_stage = None; - } + self.eta.update(self.current_checkpoint); info!( - target: "reth::cli", pipeline_stages = %format!("{pipeline_position}/{pipeline_total}"), stage = %stage_id, - progress = checkpoint.block_number, + block = checkpoint.block_number, %checkpoint, + eta = %self.eta, "{}", if done { "Stage finished executing" @@ -93,22 +93,20 @@ impl NodeState { "Stage committed progress" } ); + + if done { + self.current_stage = None; + self.eta = Eta::default(); + } } _ => (), } } - fn handle_network_event(&mut self, event: NetworkEvent) { - match event { - NetworkEvent::SessionEstablished { peer_id, status, .. } => { - info!(target: "reth::cli", connected_peers = self.num_connected_peers(), peer_id = %peer_id, best_block = %status.blockhash, "Peer connected"); - } - NetworkEvent::SessionClosed { peer_id, reason } => { - let reason = reason.map(|s| s.to_string()).unwrap_or_else(|| "None".to_string()); - debug!(target: "reth::cli", connected_peers = self.num_connected_peers(), peer_id = %peer_id, %reason, "Peer disconnected."); - } - _ => (), - } + fn handle_network_event(&mut self, _: NetworkEvent) { + // NOTE(onbjerg): This used to log established/disconnecting sessions, but this is already + // logged in the networking component. I kept this stub in case we want to catch other + // networking events later on. } fn handle_consensus_engine_event(&mut self, event: BeaconConsensusEngineEvent) { @@ -117,7 +115,6 @@ impl NodeState { let ForkchoiceState { head_block_hash, safe_block_hash, finalized_block_hash } = state; info!( - target: "reth::cli", ?head_block_hash, ?safe_block_hash, ?finalized_block_hash, @@ -128,10 +125,10 @@ impl NodeState { BeaconConsensusEngineEvent::CanonicalBlockAdded(block) => { self.latest_canonical_engine_block = Some(block.number); - info!(target: "reth::cli", number=block.number, hash=?block.hash, "Block added to canonical chain"); + info!(number=block.number, hash=?block.hash, "Block added to canonical chain"); } BeaconConsensusEngineEvent::ForkBlockAdded(block) => { - info!(target: "reth::cli", number=block.number, hash=?block.hash, "Block added to fork chain"); + info!(number=block.number, hash=?block.hash, "Block added to fork chain"); } } } @@ -139,16 +136,16 @@ impl NodeState { fn handle_consensus_layer_health_event(&self, event: ConsensusLayerHealthEvent) { match event { ConsensusLayerHealthEvent::NeverSeen => { - warn!(target: "reth::cli", "Post-merge network, but never seen beacon client. Please launch one to follow the chain!") + warn!("Post-merge network, but never seen beacon client. Please launch one to follow the chain!") } ConsensusLayerHealthEvent::HasNotBeenSeenForAWhile(period) => { - warn!(target: "reth::cli", ?period, "Post-merge network, but no beacon client seen for a while. Please launch one to follow the chain!") + warn!(?period, "Post-merge network, but no beacon client seen for a while. Please launch one to follow the chain!") } ConsensusLayerHealthEvent::NeverReceivedUpdates => { - warn!(target: "reth::cli", "Beacon client online, but never received consensus updates. Please ensure your beacon client is operational to follow the chain!") + warn!("Beacon client online, but never received consensus updates. Please ensure your beacon client is operational to follow the chain!") } ConsensusLayerHealthEvent::HaveNotReceivedUpdatesForAWhile(period) => { - warn!(target: "reth::cli", ?period, "Beacon client online, but no consensus updates received for a while. Please fix your beacon client to follow the chain!") + warn!(?period, "Beacon client online, but no consensus updates received for a while. Please fix your beacon client to follow the chain!") } } } @@ -232,6 +229,7 @@ where connected_peers = this.state.num_connected_peers(), %stage, checkpoint = %this.state.current_checkpoint, + eta = %this.state.eta, "Status" ); } else { @@ -264,3 +262,51 @@ where Poll::Pending } } + +/// A container calculating the estimated time that a stage will complete in, based on stage +/// checkpoints reported by the pipeline. +/// +/// One `Eta` is only valid for a single stage. +#[derive(Default)] +struct Eta { + /// The last stage checkpoint + last_checkpoint: EntitiesCheckpoint, + /// The last time the stage reported its checkpoint + last_checkpoint_time: Option, + /// The current ETA + eta: Option, +} + +impl Eta { + /// Update the ETA given the checkpoint. + fn update(&mut self, checkpoint: StageCheckpoint) { + let current = checkpoint.entities(); + + if let Some(last_checkpoint_time) = &self.last_checkpoint_time { + let processed_since_last = current.processed - self.last_checkpoint.processed; + let elapsed = last_checkpoint_time.elapsed(); + let per_second = processed_since_last as f64 / elapsed.as_secs_f64(); + + self.eta = Some(Duration::from_secs_f64( + (current.total - current.processed) as f64 / per_second, + )); + } + + self.last_checkpoint = current; + self.last_checkpoint_time = Some(Instant::now()); + } +} + +impl std::fmt::Display for Eta { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Some((eta, last_checkpoint_time)) = self.eta.zip(self.last_checkpoint_time) { + let remaining = eta.checked_sub(last_checkpoint_time.elapsed()); + + if let Some(remaining) = remaining { + return write!(f, "{}", humantime::format_duration(remaining)) + } + } + + write!(f, "unknown") + } +} diff --git a/crates/primitives/src/stage/checkpoints.rs b/crates/primitives/src/stage/checkpoints.rs index daa98f20b0..f6ee35c901 100644 --- a/crates/primitives/src/stage/checkpoints.rs +++ b/crates/primitives/src/stage/checkpoints.rs @@ -216,6 +216,29 @@ impl StageCheckpoint { self.block_number = block_number; self } + + /// Get the underlying [`EntitiesCheckpoint`] to determine the number of entities processed, and + /// the number of total entities to process. + pub fn entities(&self) -> EntitiesCheckpoint { + match self.stage_checkpoint { + Some( + StageUnitCheckpoint::Account(AccountHashingCheckpoint { + progress: entities, .. + }) | + StageUnitCheckpoint::Storage(StorageHashingCheckpoint { + progress: entities, .. + }) | + StageUnitCheckpoint::Entities(entities) | + StageUnitCheckpoint::Execution(ExecutionCheckpoint { progress: entities, .. }) | + StageUnitCheckpoint::Headers(HeadersCheckpoint { progress: entities, .. }) | + StageUnitCheckpoint::IndexHistory(IndexHistoryCheckpoint { + progress: entities, + .. + }), + ) => entities, + None => EntitiesCheckpoint::default(), + } + } } impl Display for StageCheckpoint { diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index e88b237735..e8ceaf874b 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -355,7 +355,7 @@ where Ok(out @ ExecOutput { checkpoint, done }) => { made_progress |= checkpoint.block_number != prev_checkpoint.unwrap_or_default().block_number; - info!( + debug!( target: "sync::pipeline", stage = %stage_id, progress = checkpoint.block_number,