diff --git a/crates/stages/stages/src/stages/hashing_account.rs b/crates/stages/stages/src/stages/hashing_account.rs index 14afb37d81..2fdccd7837 100644 --- a/crates/stages/stages/src/stages/hashing_account.rs +++ b/crates/stages/stages/src/stages/hashing_account.rs @@ -1,3 +1,4 @@ +use crate::log_progress; use alloy_primitives::{keccak256, B256}; use itertools::Itertools; use reth_config::config::{EtlConfig, HashingConfig}; @@ -18,6 +19,7 @@ use std::{ fmt::Debug, ops::{Range, RangeInclusive}, sync::mpsc::{self, Receiver}, + time::Instant, }; use tracing::*; @@ -186,16 +188,16 @@ where let mut hashed_account_cursor = tx.cursor_write::>()?; - let total_hashes = collector.len(); - let interval = (total_hashes / 10).max(1); + let total = collector.len(); + let mut last_log = Instant::now(); for (index, item) in collector.iter()?.enumerate() { - if index > 0 && index % interval == 0 { - info!( - target: "sync::stages::hashing_account", - progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0), - "Inserting hashes" - ); - } + log_progress!( + "sync::stages::hashing_account", + index, + total, + last_log, + "Inserting hashes" + ); let (key, value) = item?; hashed_account_cursor diff --git a/crates/stages/stages/src/stages/hashing_storage.rs b/crates/stages/stages/src/stages/hashing_storage.rs index ef070d30c6..0646032935 100644 --- a/crates/stages/stages/src/stages/hashing_storage.rs +++ b/crates/stages/stages/src/stages/hashing_storage.rs @@ -1,3 +1,4 @@ +use crate::log_progress; use alloy_primitives::{bytes::BufMut, keccak256, B256}; use itertools::Itertools; use reth_config::config::{EtlConfig, HashingConfig}; @@ -19,6 +20,7 @@ use reth_storage_errors::provider::ProviderResult; use std::{ fmt::Debug, sync::mpsc::{self, Receiver}, + time::Instant, }; use tracing::*; @@ -117,17 +119,17 @@ where collect(&mut channels, &mut collector)?; - let total_hashes = collector.len(); - let interval = (total_hashes / 10).max(1); + let total = collector.len(); + let mut last_log = Instant::now(); let mut cursor = tx.cursor_dup_write::()?; for (index, item) in collector.iter()?.enumerate() { - if index > 0 && index % interval == 0 { - info!( - target: "sync::stages::hashing_storage", - progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0), - "Inserting hashes" - ); - } + log_progress!( + "sync::stages::hashing_storage", + index, + total, + last_log, + "Inserting hashes" + ); let (addr_key, value) = item?; cursor.append_dup( diff --git a/crates/stages/stages/src/stages/headers.rs b/crates/stages/stages/src/stages/headers.rs index 199e015c2d..eaa2ea01f0 100644 --- a/crates/stages/stages/src/stages/headers.rs +++ b/crates/stages/stages/src/stages/headers.rs @@ -1,3 +1,4 @@ +use crate::log_progress; use alloy_primitives::{BlockHash, BlockNumber, Bytes, B256}; use futures_util::StreamExt; use reth_config::config::EtlConfig; @@ -25,6 +26,7 @@ use reth_storage_errors::provider::ProviderError; use std::{ sync::Arc, task::{ready, Context, Poll}, + time::Instant, }; use tokio::sync::watch; use tracing::*; @@ -95,9 +97,9 @@ where provider: &impl DBProvider, static_file_provider: StaticFileProvider, ) -> Result { - let total_headers = self.header_collector.len(); + let total = self.header_collector.len(); - info!(target: "sync::stages::headers", total = total_headers, "Writing headers"); + info!(target: "sync::stages::headers", total, "Writing headers"); // Consistency check of expected headers in static files vs DB is done on provider::sync_gap // when poll_execute_ready is polled. @@ -113,13 +115,11 @@ where // Although headers were downloaded in reverse order, the collector iterates it in ascending // order let mut writer = static_file_provider.latest_writer(StaticFileSegment::Headers)?; - let interval = (total_headers / 10).max(1); + let mut last_log = Instant::now(); for (index, header) in self.header_collector.iter()?.enumerate() { let (_, header_buf) = header?; - if index > 0 && index % interval == 0 && total_headers > 100 { - info!(target: "sync::stages::headers", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers"); - } + log_progress!("sync::stages::headers", index, total, last_log, "Writing headers"); let sealed_header: SealedHeader = bincode::deserialize::>(&header_buf) @@ -147,7 +147,7 @@ where writer.append_header(&header, td, &header_hash)?; } - info!(target: "sync::stages::headers", total = total_headers, "Writing headers hash index"); + info!(target: "sync::stages::headers", total, "Writing headers hash index"); let mut cursor_header_numbers = provider.tx_ref().cursor_write::>()?; @@ -168,9 +168,13 @@ where for (index, hash_to_number) in self.hash_collector.iter()?.enumerate() { let (hash, number) = hash_to_number?; - if index > 0 && index % interval == 0 && total_headers > 100 { - info!(target: "sync::stages::headers", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers hash index"); - } + log_progress!( + "sync::stages::headers", + index, + total, + last_log, + "Writing headers hash index" + ); if first_sync { cursor_header_numbers.append( diff --git a/crates/stages/stages/src/stages/tx_lookup.rs b/crates/stages/stages/src/stages/tx_lookup.rs index 60c958abf8..c5ccff54c5 100644 --- a/crates/stages/stages/src/stages/tx_lookup.rs +++ b/crates/stages/stages/src/stages/tx_lookup.rs @@ -1,3 +1,4 @@ +use super::utils::LOG_INTERVAL; use alloy_primitives::{TxHash, TxNumber}; use num_traits::Zero; use reth_config::config::{EtlConfig, TransactionLookupConfig}; @@ -17,6 +18,7 @@ use reth_stages_api::{ UnwindInput, UnwindOutput, }; use reth_storage_errors::provider::ProviderError; +use std::time::Instant; use tracing::*; /// The transaction lookup stage. @@ -147,16 +149,18 @@ where .cursor_write::>()?; let total_hashes = hash_collector.len(); - let interval = (total_hashes / 10).max(1); + let mut last_log = Instant::now(); for (index, hash_to_number) in hash_collector.iter()?.enumerate() { let (hash, number) = hash_to_number?; - if index > 0 && index % interval == 0 { + let now = Instant::now(); + if now.duration_since(last_log) >= LOG_INTERVAL { info!( target: "sync::stages::transaction_lookup", ?append_only, progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0), "Inserting hashes" ); + last_log = now; } let key = RawKey::::from_vec(hash); diff --git a/crates/stages/stages/src/stages/utils.rs b/crates/stages/stages/src/stages/utils.rs index caf039faca..af6594cd78 100644 --- a/crates/stages/stages/src/stages/utils.rs +++ b/crates/stages/stages/src/stages/utils.rs @@ -12,12 +12,36 @@ use reth_db_api::{ use reth_etl::Collector; use reth_provider::DBProvider; use reth_stages_api::StageError; -use std::{collections::HashMap, hash::Hash, ops::RangeBounds}; +use std::{ + collections::HashMap, + hash::Hash, + ops::RangeBounds, + time::{Duration, Instant}, +}; use tracing::info; /// Number of blocks before pushing indices from cache to [`Collector`] const DEFAULT_CACHE_THRESHOLD: u64 = 100_000; +/// Log interval for progress. +pub(crate) const LOG_INTERVAL: Duration = Duration::from_secs(5); + +/// Log progress at a regular interval. +#[macro_export] +macro_rules! log_progress { + ($target:expr, $index:expr, $total:expr, $last_log:expr, $message:expr) => { + let now = std::time::Instant::now(); + if now.duration_since($last_log) >= $crate::stages::utils::LOG_INTERVAL { + info!( + target: $target, + progress = %format!("{:.2}%", ($index as f64 / $total as f64) * 100.0), + $message + ); + $last_log = now; + } + } +} + /// Collects all history (`H`) indices for a range of changesets (`CS`) and stores them in a /// [`Collector`]. /// @@ -65,18 +89,16 @@ where }; // observability - let total_changesets = provider.tx_ref().entries::()?; - let interval = (total_changesets / 1000).max(1); + let total = provider.tx_ref().entries::()?; + let mut last_log = Instant::now(); let mut flush_counter = 0; let mut current_block_number = u64::MAX; - for (idx, entry) in changeset_cursor.walk_range(range)?.enumerate() { + for (index, entry) in changeset_cursor.walk_range(range)?.enumerate() { let (block_number, key) = partial_key_factory(entry?); cache.entry(key).or_default().push(block_number); - if idx > 0 && idx % interval == 0 && total_changesets > 1000 { - info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices"); - } + log_progress!("sync::stages::index_history", index, total, last_log, "Collecting indices"); // Make sure we only flush the cache every DEFAULT_CACHE_THRESHOLD blocks. if current_block_number != block_number { @@ -120,17 +142,15 @@ where let mut current_list = Vec::::new(); // observability - let total_entries = collector.len(); - let interval = (total_entries / 100).max(1); + let total = collector.len(); + let mut last_log = Instant::now(); for (index, element) in collector.iter()?.enumerate() { let (k, v) = element?; let sharded_key = decode_key(k)?; let new_list = BlockNumberList::decompress_owned(v)?; - if index > 0 && index % interval == 0 && total_entries > 100 { - info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices"); - } + log_progress!("sync::stages::index_history", index, total, last_log, "Writing indices"); // AccountsHistory: `Address`. // StorageHistory: `Address.StorageKey`.