From 176496189dbd5300fc4c778aca671a55345bd386 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 14 Oct 2024 10:29:58 +0200 Subject: [PATCH] Revert "chore(stages): reduce the progress logging " (#11698) --- .../stages/src/stages/hashing_account.rs | 20 ++++----- .../stages/src/stages/hashing_storage.rs | 20 ++++----- crates/stages/stages/src/stages/headers.rs | 24 +++++----- crates/stages/stages/src/stages/tx_lookup.rs | 8 +--- crates/stages/stages/src/stages/utils.rs | 44 +++++-------------- 5 files changed, 42 insertions(+), 74 deletions(-) diff --git a/crates/stages/stages/src/stages/hashing_account.rs b/crates/stages/stages/src/stages/hashing_account.rs index 2fdccd7837..14afb37d81 100644 --- a/crates/stages/stages/src/stages/hashing_account.rs +++ b/crates/stages/stages/src/stages/hashing_account.rs @@ -1,4 +1,3 @@ -use crate::log_progress; use alloy_primitives::{keccak256, B256}; use itertools::Itertools; use reth_config::config::{EtlConfig, HashingConfig}; @@ -19,7 +18,6 @@ use std::{ fmt::Debug, ops::{Range, RangeInclusive}, sync::mpsc::{self, Receiver}, - time::Instant, }; use tracing::*; @@ -188,16 +186,16 @@ where let mut hashed_account_cursor = tx.cursor_write::>()?; - let total = collector.len(); - let mut last_log = Instant::now(); + let total_hashes = collector.len(); + let interval = (total_hashes / 10).max(1); for (index, item) in collector.iter()?.enumerate() { - log_progress!( - "sync::stages::hashing_account", - index, - total, - last_log, - "Inserting hashes" - ); + if index > 0 && index % interval == 0 { + info!( + target: "sync::stages::hashing_account", + progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0), + "Inserting hashes" + ); + } let (key, value) = item?; hashed_account_cursor diff --git a/crates/stages/stages/src/stages/hashing_storage.rs b/crates/stages/stages/src/stages/hashing_storage.rs index 0646032935..ef070d30c6 100644 --- a/crates/stages/stages/src/stages/hashing_storage.rs +++ b/crates/stages/stages/src/stages/hashing_storage.rs @@ -1,4 +1,3 @@ -use crate::log_progress; use alloy_primitives::{bytes::BufMut, keccak256, B256}; use itertools::Itertools; use reth_config::config::{EtlConfig, HashingConfig}; @@ -20,7 +19,6 @@ use reth_storage_errors::provider::ProviderResult; use std::{ fmt::Debug, sync::mpsc::{self, Receiver}, - time::Instant, }; use tracing::*; @@ -119,17 +117,17 @@ where collect(&mut channels, &mut collector)?; - let total = collector.len(); - let mut last_log = Instant::now(); + let total_hashes = collector.len(); + let interval = (total_hashes / 10).max(1); let mut cursor = tx.cursor_dup_write::()?; for (index, item) in collector.iter()?.enumerate() { - log_progress!( - "sync::stages::hashing_storage", - index, - total, - last_log, - "Inserting hashes" - ); + if index > 0 && index % interval == 0 { + info!( + target: "sync::stages::hashing_storage", + progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0), + "Inserting hashes" + ); + } let (addr_key, value) = item?; cursor.append_dup( diff --git a/crates/stages/stages/src/stages/headers.rs b/crates/stages/stages/src/stages/headers.rs index eaa2ea01f0..199e015c2d 100644 --- a/crates/stages/stages/src/stages/headers.rs +++ b/crates/stages/stages/src/stages/headers.rs @@ -1,4 +1,3 @@ -use crate::log_progress; use alloy_primitives::{BlockHash, BlockNumber, Bytes, B256}; use futures_util::StreamExt; use reth_config::config::EtlConfig; @@ -26,7 +25,6 @@ use reth_storage_errors::provider::ProviderError; use std::{ sync::Arc, task::{ready, Context, Poll}, - time::Instant, }; use tokio::sync::watch; use tracing::*; @@ -97,9 +95,9 @@ where provider: &impl DBProvider, static_file_provider: StaticFileProvider, ) -> Result { - let total = self.header_collector.len(); + let total_headers = self.header_collector.len(); - info!(target: "sync::stages::headers", total, "Writing headers"); + info!(target: "sync::stages::headers", total = total_headers, "Writing headers"); // Consistency check of expected headers in static files vs DB is done on provider::sync_gap // when poll_execute_ready is polled. @@ -115,11 +113,13 @@ where // Although headers were downloaded in reverse order, the collector iterates it in ascending // order let mut writer = static_file_provider.latest_writer(StaticFileSegment::Headers)?; - let mut last_log = Instant::now(); + let interval = (total_headers / 10).max(1); for (index, header) in self.header_collector.iter()?.enumerate() { let (_, header_buf) = header?; - log_progress!("sync::stages::headers", index, total, last_log, "Writing headers"); + if index > 0 && index % interval == 0 && total_headers > 100 { + info!(target: "sync::stages::headers", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers"); + } let sealed_header: SealedHeader = bincode::deserialize::>(&header_buf) @@ -147,7 +147,7 @@ where writer.append_header(&header, td, &header_hash)?; } - info!(target: "sync::stages::headers", total, "Writing headers hash index"); + info!(target: "sync::stages::headers", total = total_headers, "Writing headers hash index"); let mut cursor_header_numbers = provider.tx_ref().cursor_write::>()?; @@ -168,13 +168,9 @@ where for (index, hash_to_number) in self.hash_collector.iter()?.enumerate() { let (hash, number) = hash_to_number?; - log_progress!( - "sync::stages::headers", - index, - total, - last_log, - "Writing headers hash index" - ); + if index > 0 && index % interval == 0 && total_headers > 100 { + info!(target: "sync::stages::headers", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers hash index"); + } if first_sync { cursor_header_numbers.append( diff --git a/crates/stages/stages/src/stages/tx_lookup.rs b/crates/stages/stages/src/stages/tx_lookup.rs index c5ccff54c5..60c958abf8 100644 --- a/crates/stages/stages/src/stages/tx_lookup.rs +++ b/crates/stages/stages/src/stages/tx_lookup.rs @@ -1,4 +1,3 @@ -use super::utils::LOG_INTERVAL; use alloy_primitives::{TxHash, TxNumber}; use num_traits::Zero; use reth_config::config::{EtlConfig, TransactionLookupConfig}; @@ -18,7 +17,6 @@ use reth_stages_api::{ UnwindInput, UnwindOutput, }; use reth_storage_errors::provider::ProviderError; -use std::time::Instant; use tracing::*; /// The transaction lookup stage. @@ -149,18 +147,16 @@ where .cursor_write::>()?; let total_hashes = hash_collector.len(); - let mut last_log = Instant::now(); + let interval = (total_hashes / 10).max(1); for (index, hash_to_number) in hash_collector.iter()?.enumerate() { let (hash, number) = hash_to_number?; - let now = Instant::now(); - if now.duration_since(last_log) >= LOG_INTERVAL { + if index > 0 && index % interval == 0 { info!( target: "sync::stages::transaction_lookup", ?append_only, progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0), "Inserting hashes" ); - last_log = now; } let key = RawKey::::from_vec(hash); diff --git a/crates/stages/stages/src/stages/utils.rs b/crates/stages/stages/src/stages/utils.rs index af6594cd78..caf039faca 100644 --- a/crates/stages/stages/src/stages/utils.rs +++ b/crates/stages/stages/src/stages/utils.rs @@ -12,36 +12,12 @@ use reth_db_api::{ use reth_etl::Collector; use reth_provider::DBProvider; use reth_stages_api::StageError; -use std::{ - collections::HashMap, - hash::Hash, - ops::RangeBounds, - time::{Duration, Instant}, -}; +use std::{collections::HashMap, hash::Hash, ops::RangeBounds}; use tracing::info; /// Number of blocks before pushing indices from cache to [`Collector`] const DEFAULT_CACHE_THRESHOLD: u64 = 100_000; -/// Log interval for progress. -pub(crate) const LOG_INTERVAL: Duration = Duration::from_secs(5); - -/// Log progress at a regular interval. -#[macro_export] -macro_rules! log_progress { - ($target:expr, $index:expr, $total:expr, $last_log:expr, $message:expr) => { - let now = std::time::Instant::now(); - if now.duration_since($last_log) >= $crate::stages::utils::LOG_INTERVAL { - info!( - target: $target, - progress = %format!("{:.2}%", ($index as f64 / $total as f64) * 100.0), - $message - ); - $last_log = now; - } - } -} - /// Collects all history (`H`) indices for a range of changesets (`CS`) and stores them in a /// [`Collector`]. /// @@ -89,16 +65,18 @@ where }; // observability - let total = provider.tx_ref().entries::()?; - let mut last_log = Instant::now(); + let total_changesets = provider.tx_ref().entries::()?; + let interval = (total_changesets / 1000).max(1); let mut flush_counter = 0; let mut current_block_number = u64::MAX; - for (index, entry) in changeset_cursor.walk_range(range)?.enumerate() { + for (idx, entry) in changeset_cursor.walk_range(range)?.enumerate() { let (block_number, key) = partial_key_factory(entry?); cache.entry(key).or_default().push(block_number); - log_progress!("sync::stages::index_history", index, total, last_log, "Collecting indices"); + if idx > 0 && idx % interval == 0 && total_changesets > 1000 { + info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices"); + } // Make sure we only flush the cache every DEFAULT_CACHE_THRESHOLD blocks. if current_block_number != block_number { @@ -142,15 +120,17 @@ where let mut current_list = Vec::::new(); // observability - let total = collector.len(); - let mut last_log = Instant::now(); + let total_entries = collector.len(); + let interval = (total_entries / 100).max(1); for (index, element) in collector.iter()?.enumerate() { let (k, v) = element?; let sharded_key = decode_key(k)?; let new_list = BlockNumberList::decompress_owned(v)?; - log_progress!("sync::stages::index_history", index, total, last_log, "Writing indices"); + if index > 0 && index % interval == 0 && total_entries > 100 { + info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices"); + } // AccountsHistory: `Address`. // StorageHistory: `Address.StorageKey`.