From 068220bb0ad3c626e2323dda91f7423e9b375278 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 15 Jul 2024 17:47:17 +0100 Subject: [PATCH] feat(pruner, stages): logs for Prune stage (#9520) --- crates/engine/tree/src/database.rs | 8 +- crates/engine/tree/src/persistence.rs | 6 +- crates/prune/prune/src/pruner.rs | 44 +++--- crates/prune/prune/src/segments/mod.rs | 57 +------- crates/prune/prune/src/segments/receipts.rs | 21 ++- .../prune/src/segments/static_file/headers.rs | 29 ++-- .../src/segments/static_file/receipts.rs | 6 +- .../src/segments/static_file/transactions.rs | 21 +-- .../src/segments/user/account_history.rs | 24 ++-- .../prune/prune/src/segments/user/receipts.rs | 6 +- .../src/segments/user/receipts_by_logs.rs | 8 +- .../src/segments/user/sender_recovery.rs | 18 +-- .../src/segments/user/storage_history.rs | 22 ++- .../src/segments/user/transaction_lookup.rs | 18 +-- crates/prune/types/src/lib.rs | 67 +-------- crates/prune/types/src/pruner.rs | 130 ++++++++++++++++++ crates/stages/stages/src/stages/prune.rs | 3 +- 17 files changed, 260 insertions(+), 228 deletions(-) create mode 100644 crates/prune/types/src/pruner.rs diff --git a/crates/engine/tree/src/database.rs b/crates/engine/tree/src/database.rs index 063b48aca3..81a9c5234b 100644 --- a/crates/engine/tree/src/database.rs +++ b/crates/engine/tree/src/database.rs @@ -8,7 +8,7 @@ use reth_provider::{ bundle_state::HashedStateChanges, BlockWriter, HistoryWriter, OriginalValuesKnown, ProviderFactory, StageCheckpointWriter, StateWriter, }; -use reth_prune::{PruneProgress, Pruner}; +use reth_prune::{Pruner, PrunerOutput}; use reth_stages_types::{StageCheckpoint, StageId}; use std::sync::mpsc::{Receiver, SendError, Sender}; use tokio::sync::oneshot; @@ -118,7 +118,7 @@ impl DatabaseService { /// Prunes block data before the given block hash according to the configured prune /// configuration. - fn prune_before(&mut self, block_num: u64) -> PruneProgress { + fn prune_before(&mut self, block_num: u64) -> PrunerOutput { // TODO: doing this properly depends on pruner segment changes self.pruner.run(block_num).expect("todo: handle errors") } @@ -201,7 +201,7 @@ pub enum DatabaseAction { /// Prune associated block data before the given block number, according to already-configured /// prune modes. - PruneBefore((u64, oneshot::Sender)), + PruneBefore((u64, oneshot::Sender)), } /// A handle to the database service @@ -246,7 +246,7 @@ impl DatabaseServiceHandle { /// Tells the database service to remove block data before the given hash, according to the /// configured prune config. - pub async fn prune_before(&self, block_num: u64) -> PruneProgress { + pub async fn prune_before(&self, block_num: u64) -> PrunerOutput { let (tx, rx) = oneshot::channel(); self.sender .send(DatabaseAction::PruneBefore((block_num, tx))) diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index a51906e78f..26409ad309 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -8,7 +8,7 @@ use crate::{ use reth_db::Database; use reth_primitives::{SealedBlock, B256, U256}; use reth_provider::ProviderFactory; -use reth_prune::{PruneProgress, Pruner}; +use reth_prune::{Pruner, PrunerOutput}; use std::sync::{ mpsc::{SendError, Sender}, Arc, @@ -36,7 +36,7 @@ pub enum PersistenceAction { /// Prune associated block data before the given block number, according to already-configured /// prune modes. - PruneBefore((u64, oneshot::Sender)), + PruneBefore((u64, oneshot::Sender)), } /// An error type for when there is a [`SendError`] while sending an action to one of the services. @@ -172,7 +172,7 @@ impl PersistenceHandle { /// Tells the persistence service to remove block data before the given hash, according to the /// configured prune config. - pub async fn prune_before(&self, block_num: u64) -> PruneProgress { + pub async fn prune_before(&self, block_num: u64) -> PrunerOutput { let (tx, rx) = oneshot::channel(); self.send_action(PersistenceAction::PruneBefore((block_num, tx))) .expect("should be able to send"); diff --git a/crates/prune/prune/src/pruner.rs b/crates/prune/prune/src/pruner.rs index 479022eb10..dea50406a5 100644 --- a/crates/prune/prune/src/pruner.rs +++ b/crates/prune/prune/src/pruner.rs @@ -8,14 +8,14 @@ use alloy_primitives::BlockNumber; use reth_db_api::database::Database; use reth_exex_types::FinishedExExHeight; use reth_provider::{DatabaseProviderRW, ProviderFactory, PruneCheckpointReader}; -use reth_prune_types::{PruneLimiter, PruneProgress, PruneSegment}; +use reth_prune_types::{PruneLimiter, PruneProgress, PruneSegment, PrunerOutput}; use reth_tokio_util::{EventSender, EventStream}; use std::time::{Duration, Instant}; use tokio::sync::watch; use tracing::debug; /// Result of [`Pruner::run`] execution. -pub type PrunerResult = Result; +pub type PrunerResult = Result; /// The pruner type itself with the result of [`Pruner::run`] pub type PrunerWithResult = (Pruner, PrunerResult); @@ -107,13 +107,13 @@ impl Pruner { let Some(tip_block_number) = self.adjust_tip_block_number_to_finished_exex_height(tip_block_number) else { - return Ok(PruneProgress::Finished) + return Ok(PruneProgress::Finished.into()) }; if tip_block_number == 0 { self.previous_tip_block_number = Some(tip_block_number); debug!(target: "pruner", %tip_block_number, "Nothing to prune yet"); - return Ok(PruneProgress::Finished) + return Ok(PruneProgress::Finished.into()) } self.event_sender.notify(PrunerEvent::Started { tip_block_number }); @@ -126,7 +126,7 @@ impl Pruner { limiter = limiter.set_time_limit(timeout); }; - let (stats, deleted_entries, progress) = + let (stats, deleted_entries, output) = self.prune_segments(provider, tip_block_number, &mut limiter)?; self.previous_tip_block_number = Some(tip_block_number); @@ -134,7 +134,7 @@ impl Pruner { let elapsed = start.elapsed(); self.metrics.duration_seconds.record(elapsed); - let message = match progress { + let message = match output.progress { PruneProgress::HasMoreData(_) => "Pruner interrupted and has more data to prune", PruneProgress::Finished => "Pruner finished", }; @@ -145,14 +145,14 @@ impl Pruner { ?elapsed, ?deleted_entries, ?limiter, - ?progress, + ?output, ?stats, "{message}", ); self.event_sender.notify(PrunerEvent::Finished { tip_block_number, elapsed, stats }); - Ok(progress) + Ok(output) } /// Prunes the segments that the [Pruner] was initialized with, and the segments that needs to @@ -165,10 +165,13 @@ impl Pruner { provider: &DatabaseProviderRW, tip_block_number: BlockNumber, limiter: &mut PruneLimiter, - ) -> Result<(PrunerStats, usize, PruneProgress), PrunerError> { + ) -> Result<(PrunerStats, usize, PrunerOutput), PrunerError> { let mut stats = PrunerStats::new(); let mut pruned = 0; - let mut progress = PruneProgress::Finished; + let mut output = PrunerOutput { + progress: PruneProgress::Finished, + segments: Vec::with_capacity(self.segments.len()), + }; for segment in &self.segments { if limiter.is_limit_reached() { @@ -194,11 +197,11 @@ impl Pruner { let segment_start = Instant::now(); let previous_checkpoint = provider.get_prune_checkpoint(segment.segment())?; - let output = segment.prune( + let segment_output = segment.prune( provider, PruneInput { previous_checkpoint, to_block, limiter: limiter.clone() }, )?; - if let Some(checkpoint) = output.checkpoint { + if let Some(checkpoint) = segment_output.checkpoint { segment .save_checkpoint(provider, checkpoint.as_prune_checkpoint(prune_mode))?; } @@ -207,7 +210,7 @@ impl Pruner { .duration_seconds .record(segment_start.elapsed()); if let Some(highest_pruned_block) = - output.checkpoint.and_then(|checkpoint| checkpoint.block_number) + segment_output.checkpoint.and_then(|checkpoint| checkpoint.block_number) { self.metrics .get_prune_segment_metrics(segment.segment()) @@ -215,7 +218,8 @@ impl Pruner { .set(highest_pruned_block as f64); } - progress = output.progress; + output.progress = segment_output.progress; + output.segments.push((segment.segment(), segment_output)); debug!( target: "pruner", @@ -223,21 +227,21 @@ impl Pruner { purpose = ?segment.purpose(), %to_block, ?prune_mode, - %output.pruned, + %segment_output.pruned, "Segment pruning finished" ); - if output.pruned > 0 { - limiter.increment_deleted_entries_count_by(output.pruned); - pruned += output.pruned; - stats.push((segment.segment(), output.pruned, output.progress)); + if segment_output.pruned > 0 { + limiter.increment_deleted_entries_count_by(segment_output.pruned); + pruned += segment_output.pruned; + stats.push((segment.segment(), segment_output.pruned, segment_output.progress)); } } else { debug!(target: "pruner", segment = ?segment.segment(), purpose = ?segment.purpose(), "Nothing to prune for the segment"); } } - Ok((stats, pruned, progress)) + Ok((stats, pruned, output)) } /// Returns `true` if the pruning is needed at the provided tip block number. diff --git a/crates/prune/prune/src/segments/mod.rs b/crates/prune/prune/src/segments/mod.rs index 8700ac8a0d..d9f4538065 100644 --- a/crates/prune/prune/src/segments/mod.rs +++ b/crates/prune/prune/src/segments/mod.rs @@ -10,8 +10,7 @@ use reth_provider::{ errors::provider::ProviderResult, BlockReader, DatabaseProviderRW, PruneCheckpointWriter, }; use reth_prune_types::{ - PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, PrunePurpose, - PruneSegment, + PruneCheckpoint, PruneLimiter, PruneMode, PrunePurpose, PruneSegment, SegmentOutput, }; pub use set::SegmentSet; pub use static_file::{ @@ -29,9 +28,9 @@ pub use user::{ /// /// Segments are called from [Pruner](crate::Pruner) with the following lifecycle: /// 1. Call [`Segment::prune`] with `delete_limit` of [`PruneInput`]. -/// 2. If [`Segment::prune`] returned a [Some] in `checkpoint` of [`PruneOutput`], call +/// 2. If [`Segment::prune`] returned a [Some] in `checkpoint` of [`SegmentOutput`], call /// [`Segment::save_checkpoint`]. -/// 3. Subtract `pruned` of [`PruneOutput`] from `delete_limit` of next [`PruneInput`]. +/// 3. Subtract `pruned` of [`SegmentOutput`] from `delete_limit` of next [`PruneInput`]. pub trait Segment: Debug + Send + Sync { /// Segment of data that's pruned. fn segment(&self) -> PruneSegment; @@ -47,7 +46,7 @@ pub trait Segment: Debug + Send + Sync { &self, provider: &DatabaseProviderRW, input: PruneInput, - ) -> Result; + ) -> Result; /// Save checkpoint for [`Self::segment`] to the database. fn save_checkpoint( @@ -148,51 +147,3 @@ impl PruneInput { .unwrap_or(0) } } - -/// Segment pruning output, see [`Segment::prune`]. -#[derive(Debug, Clone, Copy, Eq, PartialEq)] -pub struct PruneOutput { - pub(crate) progress: PruneProgress, - /// Number of entries pruned, i.e. deleted from the database. - pub(crate) pruned: usize, - /// Pruning checkpoint to save to database, if any. - pub(crate) checkpoint: Option, -} - -impl PruneOutput { - /// Returns a [`PruneOutput`] with `done = true`, `pruned = 0` and `checkpoint = None`. - /// Use when no pruning is needed. - pub(crate) const fn done() -> Self { - Self { progress: PruneProgress::Finished, pruned: 0, checkpoint: None } - } - - /// Returns a [`PruneOutput`] with `done = false`, `pruned = 0` and `checkpoint = None`. - /// Use when pruning is needed but cannot be done. - pub(crate) const fn not_done( - reason: PruneInterruptReason, - checkpoint: Option, - ) -> Self { - Self { progress: PruneProgress::HasMoreData(reason), pruned: 0, checkpoint } - } -} - -#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)] -pub(crate) struct PruneOutputCheckpoint { - /// Highest pruned block number. If it's [None], the pruning for block `0` is not finished yet. - pub(crate) block_number: Option, - /// Highest pruned transaction number, if applicable. - pub(crate) tx_number: Option, -} - -impl PruneOutputCheckpoint { - /// Converts [`PruneOutputCheckpoint`] to [`PruneCheckpoint`] with the provided [`PruneMode`] - pub(crate) const fn as_prune_checkpoint(&self, prune_mode: PruneMode) -> PruneCheckpoint { - PruneCheckpoint { block_number: self.block_number, tx_number: self.tx_number, prune_mode } - } -} - -impl From for PruneOutputCheckpoint { - fn from(checkpoint: PruneCheckpoint) -> Self { - Self { block_number: checkpoint.block_number, tx_number: checkpoint.tx_number } - } -} diff --git a/crates/prune/prune/src/segments/receipts.rs b/crates/prune/prune/src/segments/receipts.rs index 3483d9e1c2..14723bbfda 100644 --- a/crates/prune/prune/src/segments/receipts.rs +++ b/crates/prune/prune/src/segments/receipts.rs @@ -5,28 +5,27 @@ //! - [`crate::segments::static_file::Receipts`] is responsible for pruning receipts on an archive //! node after static file producer has finished -use crate::{ - segments::{PruneInput, PruneOutput, PruneOutputCheckpoint}, - PrunerError, -}; +use crate::{segments::PruneInput, PrunerError}; use reth_db::tables; use reth_db_api::database::Database; use reth_provider::{ errors::provider::ProviderResult, DatabaseProviderRW, PruneCheckpointWriter, TransactionsProvider, }; -use reth_prune_types::{PruneCheckpoint, PruneProgress, PruneSegment}; +use reth_prune_types::{ + PruneCheckpoint, PruneProgress, PruneSegment, SegmentOutput, SegmentOutputCheckpoint, +}; use tracing::trace; pub(crate) fn prune( provider: &DatabaseProviderRW, input: PruneInput, -) -> Result { +) -> Result { let tx_range = match input.get_next_tx_num_range(provider)? { Some(range) => range, None => { trace!(target: "pruner", "No receipts to prune"); - return Ok(PruneOutput::done()) + return Ok(SegmentOutput::done()) } }; let tx_range_end = *tx_range.end(); @@ -51,10 +50,10 @@ pub(crate) fn prune( let progress = PruneProgress::new(done, &limiter); - Ok(PruneOutput { + Ok(SegmentOutput { progress, pruned, - checkpoint: Some(PruneOutputCheckpoint { + checkpoint: Some(SegmentOutputCheckpoint { block_number: last_pruned_block, tx_number: Some(last_pruned_transaction), }), @@ -76,7 +75,7 @@ pub(crate) fn save_checkpoint( #[cfg(test)] mod tests { - use crate::segments::{PruneInput, PruneOutput}; + use crate::segments::{PruneInput, SegmentOutput}; use alloy_primitives::{BlockNumber, TxNumber, B256}; use assert_matches::assert_matches; use itertools::{ @@ -162,7 +161,7 @@ mod tests { assert_matches!( result, - PruneOutput {progress, pruned, checkpoint: Some(_)} + SegmentOutput {progress, pruned, checkpoint: Some(_)} if (progress, pruned) == expected_result ); diff --git a/crates/prune/prune/src/segments/static_file/headers.rs b/crates/prune/prune/src/segments/static_file/headers.rs index a9c26a97ca..450dae3a80 100644 --- a/crates/prune/prune/src/segments/static_file/headers.rs +++ b/crates/prune/prune/src/segments/static_file/headers.rs @@ -1,7 +1,7 @@ use std::num::NonZeroUsize; use crate::{ - segments::{PruneInput, PruneOutput, PruneOutputCheckpoint, Segment}, + segments::{PruneInput, Segment}, PrunerError, }; use alloy_primitives::BlockNumber; @@ -13,7 +13,10 @@ use reth_db::{ transaction::DbTxMut, }; use reth_provider::{providers::StaticFileProvider, DatabaseProviderRW}; -use reth_prune_types::{PruneLimiter, PruneMode, PruneProgress, PrunePurpose, PruneSegment}; +use reth_prune_types::{ + PruneLimiter, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, + SegmentOutputCheckpoint, +}; use reth_static_file_types::StaticFileSegment; use tracing::trace; @@ -50,12 +53,12 @@ impl Segment for Headers { &self, provider: &DatabaseProviderRW, input: PruneInput, - ) -> Result { + ) -> Result { let (block_range_start, block_range_end) = match input.get_next_block_range() { Some(range) => (*range.start(), *range.end()), None => { trace!(target: "pruner", "No headers to prune"); - return Ok(PruneOutput::done()) + return Ok(SegmentOutput::done()) } }; @@ -93,10 +96,10 @@ impl Segment for Headers { let done = last_pruned_block.map_or(false, |block| block == block_range_end); let progress = PruneProgress::new(done, &limiter); - Ok(PruneOutput { + Ok(SegmentOutput { progress, pruned, - checkpoint: Some(PruneOutputCheckpoint { + checkpoint: Some(SegmentOutputCheckpoint { block_number: last_pruned_block, tx_number: None, }), @@ -193,8 +196,7 @@ where #[cfg(test)] mod tests { use crate::segments::{ - static_file::headers::HEADER_TABLES_TO_PRUNE, PruneInput, PruneOutput, - PruneOutputCheckpoint, Segment, + static_file::headers::HEADER_TABLES_TO_PRUNE, PruneInput, Segment, SegmentOutput, }; use alloy_primitives::{BlockNumber, B256, U256}; use assert_matches::assert_matches; @@ -202,7 +204,8 @@ mod tests { use reth_db_api::transaction::DbTx; use reth_provider::{PruneCheckpointReader, PruneCheckpointWriter, StaticFileProviderFactory}; use reth_prune_types::{ - PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, PruneSegment, + PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, + PruneSegment, SegmentOutputCheckpoint, }; use reth_stages::test_utils::TestStageDB; use reth_testing_utils::{generators, generators::random_header_range}; @@ -258,12 +261,12 @@ mod tests { expected_prune_progress=?expected_result.0, expected_pruned=?expected_result.1, result=?result, - "PruneOutput" + "SegmentOutput" ); assert_matches!( result, - PruneOutput {progress, pruned, checkpoint: Some(_)} + SegmentOutput {progress, pruned, checkpoint: Some(_)} if (progress, pruned) == expected_result ); provider @@ -327,9 +330,9 @@ mod tests { let result = segment.prune(&provider, input).unwrap(); assert_eq!( result, - PruneOutput::not_done( + SegmentOutput::not_done( PruneInterruptReason::DeletedEntriesLimitReached, - Some(PruneOutputCheckpoint::default()) + Some(SegmentOutputCheckpoint::default()) ) ); } diff --git a/crates/prune/prune/src/segments/static_file/receipts.rs b/crates/prune/prune/src/segments/static_file/receipts.rs index ea616c6ab0..e84f7df443 100644 --- a/crates/prune/prune/src/segments/static_file/receipts.rs +++ b/crates/prune/prune/src/segments/static_file/receipts.rs @@ -1,12 +1,12 @@ use crate::{ - segments::{PruneInput, PruneOutput, Segment}, + segments::{PruneInput, Segment}, PrunerError, }; use reth_db_api::database::Database; use reth_provider::{ errors::provider::ProviderResult, providers::StaticFileProvider, DatabaseProviderRW, }; -use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment}; +use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, SegmentOutput}; use reth_static_file_types::StaticFileSegment; #[derive(Debug)] @@ -39,7 +39,7 @@ impl Segment for Receipts { &self, provider: &DatabaseProviderRW, input: PruneInput, - ) -> Result { + ) -> Result { crate::segments::receipts::prune(provider, input) } diff --git a/crates/prune/prune/src/segments/static_file/transactions.rs b/crates/prune/prune/src/segments/static_file/transactions.rs index 1da08806f0..5e42519e55 100644 --- a/crates/prune/prune/src/segments/static_file/transactions.rs +++ b/crates/prune/prune/src/segments/static_file/transactions.rs @@ -1,11 +1,13 @@ use crate::{ - segments::{PruneInput, PruneOutput, PruneOutputCheckpoint, Segment}, + segments::{PruneInput, Segment}, PrunerError, }; use reth_db::tables; use reth_db_api::database::Database; use reth_provider::{providers::StaticFileProvider, DatabaseProviderRW, TransactionsProvider}; -use reth_prune_types::{PruneMode, PruneProgress, PrunePurpose, PruneSegment}; +use reth_prune_types::{ + PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint, +}; use reth_static_file_types::StaticFileSegment; use tracing::trace; @@ -39,12 +41,12 @@ impl Segment for Transactions { &self, provider: &DatabaseProviderRW, input: PruneInput, - ) -> Result { + ) -> Result { let tx_range = match input.get_next_tx_num_range(provider)? { Some(range) => range, None => { trace!(target: "pruner", "No transactions to prune"); - return Ok(PruneOutput::done()) + return Ok(SegmentOutput::done()) } }; @@ -68,10 +70,10 @@ impl Segment for Transactions { let progress = PruneProgress::new(done, &limiter); - Ok(PruneOutput { + Ok(SegmentOutput { progress, pruned, - checkpoint: Some(PruneOutputCheckpoint { + checkpoint: Some(SegmentOutputCheckpoint { block_number: last_pruned_block, tx_number: Some(last_pruned_transaction), }), @@ -81,7 +83,7 @@ impl Segment for Transactions { #[cfg(test)] mod tests { - use crate::segments::{PruneInput, PruneOutput, Segment}; + use crate::segments::{PruneInput, Segment}; use alloy_primitives::{BlockNumber, TxNumber, B256}; use assert_matches::assert_matches; use itertools::{ @@ -91,7 +93,8 @@ mod tests { use reth_db::tables; use reth_provider::{PruneCheckpointReader, PruneCheckpointWriter, StaticFileProviderFactory}; use reth_prune_types::{ - PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, PruneSegment, + PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, + PruneSegment, SegmentOutput, }; use reth_stages::test_utils::{StorageKind, TestStageDB}; use reth_testing_utils::{generators, generators::random_block_range}; @@ -140,7 +143,7 @@ mod tests { assert_matches!( result, - PruneOutput {progress, pruned, checkpoint: Some(_)} + SegmentOutput {progress, pruned, checkpoint: Some(_)} if (progress, pruned) == expected_result ); diff --git a/crates/prune/prune/src/segments/user/account_history.rs b/crates/prune/prune/src/segments/user/account_history.rs index bbf2a155eb..5b93f37231 100644 --- a/crates/prune/prune/src/segments/user/account_history.rs +++ b/crates/prune/prune/src/segments/user/account_history.rs @@ -1,8 +1,5 @@ use crate::{ - segments::{ - user::history::prune_history_indices, PruneInput, PruneOutput, PruneOutputCheckpoint, - Segment, - }, + segments::{user::history::prune_history_indices, PruneInput, Segment}, PrunerError, }; use itertools::Itertools; @@ -10,7 +7,8 @@ use reth_db::tables; use reth_db_api::{database::Database, models::ShardedKey}; use reth_provider::DatabaseProviderRW; use reth_prune_types::{ - PruneInterruptReason, PruneMode, PruneProgress, PrunePurpose, PruneSegment, + PruneInterruptReason, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, + SegmentOutputCheckpoint, }; use rustc_hash::FxHashMap; use tracing::{instrument, trace}; @@ -50,12 +48,12 @@ impl Segment for AccountHistory { &self, provider: &DatabaseProviderRW, input: PruneInput, - ) -> Result { + ) -> Result { let range = match input.get_next_block_range() { Some(range) => range, None => { trace!(target: "pruner", "No account history to prune"); - return Ok(PruneOutput::done()) + return Ok(SegmentOutput::done()) } }; let range_end = *range.end(); @@ -66,9 +64,9 @@ impl Segment for AccountHistory { input.limiter }; if limiter.is_limit_reached() { - return Ok(PruneOutput::not_done( + return Ok(SegmentOutput::not_done( PruneInterruptReason::new(&limiter), - input.previous_checkpoint.map(|checkpoint| checkpoint.into()), + input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint), )) } @@ -117,10 +115,10 @@ impl Segment for AccountHistory { let progress = PruneProgress::new(done, &limiter); - Ok(PruneOutput { + Ok(SegmentOutput { progress, pruned: pruned_changesets + outcomes.deleted, - checkpoint: Some(PruneOutputCheckpoint { + checkpoint: Some(SegmentOutputCheckpoint { block_number: Some(last_changeset_pruned_block), tx_number: None, }), @@ -132,7 +130,7 @@ impl Segment for AccountHistory { mod tests { use crate::segments::{ user::account_history::ACCOUNT_HISTORY_TABLES_TO_PRUNE, AccountHistory, PruneInput, - PruneOutput, Segment, + Segment, SegmentOutput, }; use alloy_primitives::{BlockNumber, B256}; use assert_matches::assert_matches; @@ -208,7 +206,7 @@ mod tests { assert_matches!( result, - PruneOutput {progress, pruned, checkpoint: Some(_)} + SegmentOutput {progress, pruned, checkpoint: Some(_)} if (progress, pruned) == expected_result ); diff --git a/crates/prune/prune/src/segments/user/receipts.rs b/crates/prune/prune/src/segments/user/receipts.rs index 0105e0c2fd..7185affd0f 100644 --- a/crates/prune/prune/src/segments/user/receipts.rs +++ b/crates/prune/prune/src/segments/user/receipts.rs @@ -1,10 +1,10 @@ use crate::{ - segments::{PruneInput, PruneOutput, Segment}, + segments::{PruneInput, Segment}, PrunerError, }; use reth_db_api::database::Database; use reth_provider::{errors::provider::ProviderResult, DatabaseProviderRW}; -use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment}; +use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, SegmentOutput}; use tracing::instrument; #[derive(Debug)] @@ -36,7 +36,7 @@ impl Segment for Receipts { &self, provider: &DatabaseProviderRW, input: PruneInput, - ) -> Result { + ) -> Result { crate::segments::receipts::prune(provider, input) } diff --git a/crates/prune/prune/src/segments/user/receipts_by_logs.rs b/crates/prune/prune/src/segments/user/receipts_by_logs.rs index 24940d1510..a567a01ad8 100644 --- a/crates/prune/prune/src/segments/user/receipts_by_logs.rs +++ b/crates/prune/prune/src/segments/user/receipts_by_logs.rs @@ -1,5 +1,5 @@ use crate::{ - segments::{PruneInput, PruneOutput, Segment}, + segments::{PruneInput, Segment}, PrunerError, }; use reth_db::tables; @@ -7,7 +7,7 @@ use reth_db_api::database::Database; use reth_provider::{BlockReader, DatabaseProviderRW, PruneCheckpointWriter, TransactionsProvider}; use reth_prune_types::{ PruneCheckpoint, PruneMode, PruneProgress, PrunePurpose, PruneSegment, ReceiptsLogPruneConfig, - MINIMUM_PRUNING_DISTANCE, + SegmentOutput, MINIMUM_PRUNING_DISTANCE, }; use tracing::{instrument, trace}; @@ -40,7 +40,7 @@ impl Segment for ReceiptsByLogs { &self, provider: &DatabaseProviderRW, input: PruneInput, - ) -> Result { + ) -> Result { // Contract log filtering removes every receipt possible except the ones in the list. So, // for the other receipts it's as if they had a `PruneMode::Distance()` of // `MINIMUM_PRUNING_DISTANCE`. @@ -213,7 +213,7 @@ impl Segment for ReceiptsByLogs { let progress = PruneProgress::new(done, &limiter); - Ok(PruneOutput { progress, pruned, checkpoint: None }) + Ok(SegmentOutput { progress, pruned, checkpoint: None }) } } diff --git a/crates/prune/prune/src/segments/user/sender_recovery.rs b/crates/prune/prune/src/segments/user/sender_recovery.rs index 695147efa8..1ba48d635f 100644 --- a/crates/prune/prune/src/segments/user/sender_recovery.rs +++ b/crates/prune/prune/src/segments/user/sender_recovery.rs @@ -1,11 +1,13 @@ use crate::{ - segments::{PruneInput, PruneOutput, PruneOutputCheckpoint, Segment}, + segments::{PruneInput, Segment}, PrunerError, }; use reth_db::tables; use reth_db_api::database::Database; use reth_provider::{DatabaseProviderRW, TransactionsProvider}; -use reth_prune_types::{PruneMode, PruneProgress, PrunePurpose, PruneSegment}; +use reth_prune_types::{ + PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint, +}; use tracing::{instrument, trace}; #[derive(Debug)] @@ -37,12 +39,12 @@ impl Segment for SenderRecovery { &self, provider: &DatabaseProviderRW, input: PruneInput, - ) -> Result { + ) -> Result { let tx_range = match input.get_next_tx_num_range(provider)? { Some(range) => range, None => { trace!(target: "pruner", "No transaction senders to prune"); - return Ok(PruneOutput::done()) + return Ok(SegmentOutput::done()) } }; let tx_range_end = *tx_range.end(); @@ -67,10 +69,10 @@ impl Segment for SenderRecovery { let progress = PruneProgress::new(done, &limiter); - Ok(PruneOutput { + Ok(SegmentOutput { progress, pruned, - checkpoint: Some(PruneOutputCheckpoint { + checkpoint: Some(SegmentOutputCheckpoint { block_number: last_pruned_block, tx_number: Some(last_pruned_transaction), }), @@ -80,7 +82,7 @@ impl Segment for SenderRecovery { #[cfg(test)] mod tests { - use crate::segments::{PruneInput, PruneOutput, Segment, SenderRecovery}; + use crate::segments::{PruneInput, Segment, SegmentOutput, SenderRecovery}; use alloy_primitives::{BlockNumber, TxNumber, B256}; use assert_matches::assert_matches; use itertools::{ @@ -179,7 +181,7 @@ mod tests { assert_matches!( result, - PruneOutput {progress, pruned, checkpoint: Some(_)} + SegmentOutput {progress, pruned, checkpoint: Some(_)} if (progress, pruned) == expected_result ); diff --git a/crates/prune/prune/src/segments/user/storage_history.rs b/crates/prune/prune/src/segments/user/storage_history.rs index a400499c6f..8c5dd99cce 100644 --- a/crates/prune/prune/src/segments/user/storage_history.rs +++ b/crates/prune/prune/src/segments/user/storage_history.rs @@ -1,8 +1,5 @@ use crate::{ - segments::{ - user::history::prune_history_indices, PruneInput, PruneOutput, PruneOutputCheckpoint, - Segment, - }, + segments::{user::history::prune_history_indices, PruneInput, Segment, SegmentOutput}, PrunerError, }; use itertools::Itertools; @@ -14,6 +11,7 @@ use reth_db_api::{ use reth_provider::DatabaseProviderRW; use reth_prune_types::{ PruneInterruptReason, PruneMode, PruneProgress, PrunePurpose, PruneSegment, + SegmentOutputCheckpoint, }; use rustc_hash::FxHashMap; use tracing::{instrument, trace}; @@ -53,12 +51,12 @@ impl Segment for StorageHistory { &self, provider: &DatabaseProviderRW, input: PruneInput, - ) -> Result { + ) -> Result { let range = match input.get_next_block_range() { Some(range) => range, None => { trace!(target: "pruner", "No storage history to prune"); - return Ok(PruneOutput::done()) + return Ok(SegmentOutput::done()) } }; let range_end = *range.end(); @@ -69,9 +67,9 @@ impl Segment for StorageHistory { input.limiter }; if limiter.is_limit_reached() { - return Ok(PruneOutput::not_done( + return Ok(SegmentOutput::not_done( PruneInterruptReason::new(&limiter), - input.previous_checkpoint.map(|checkpoint| checkpoint.into()), + input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint), )) } @@ -125,10 +123,10 @@ impl Segment for StorageHistory { let progress = PruneProgress::new(done, &limiter); - Ok(PruneOutput { + Ok(SegmentOutput { progress, pruned: pruned_changesets + outcomes.deleted, - checkpoint: Some(PruneOutputCheckpoint { + checkpoint: Some(SegmentOutputCheckpoint { block_number: Some(last_changeset_pruned_block), tx_number: None, }), @@ -139,7 +137,7 @@ impl Segment for StorageHistory { #[cfg(test)] mod tests { use crate::segments::{ - user::storage_history::STORAGE_HISTORY_TABLES_TO_PRUNE, PruneInput, PruneOutput, Segment, + user::storage_history::STORAGE_HISTORY_TABLES_TO_PRUNE, PruneInput, Segment, SegmentOutput, StorageHistory, }; use alloy_primitives::{BlockNumber, B256}; @@ -215,7 +213,7 @@ mod tests { assert_matches!( result, - PruneOutput {progress, pruned, checkpoint: Some(_)} + SegmentOutput {progress, pruned, checkpoint: Some(_)} if (progress, pruned) == expected_result ); diff --git a/crates/prune/prune/src/segments/user/transaction_lookup.rs b/crates/prune/prune/src/segments/user/transaction_lookup.rs index b7889cfc64..1453873cf3 100644 --- a/crates/prune/prune/src/segments/user/transaction_lookup.rs +++ b/crates/prune/prune/src/segments/user/transaction_lookup.rs @@ -1,12 +1,14 @@ use crate::{ - segments::{PruneInput, PruneOutput, PruneOutputCheckpoint, Segment}, + segments::{PruneInput, Segment, SegmentOutput}, PrunerError, }; use rayon::prelude::*; use reth_db::tables; use reth_db_api::database::Database; use reth_provider::{DatabaseProviderRW, TransactionsProvider}; -use reth_prune_types::{PruneMode, PruneProgress, PrunePurpose, PruneSegment}; +use reth_prune_types::{ + PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutputCheckpoint, +}; use tracing::{instrument, trace}; #[derive(Debug)] @@ -38,12 +40,12 @@ impl Segment for TransactionLookup { &self, provider: &DatabaseProviderRW, input: PruneInput, - ) -> Result { + ) -> Result { let (start, end) = match input.get_next_tx_num_range(provider)? { Some(range) => range, None => { trace!(target: "pruner", "No transaction lookup entries to prune"); - return Ok(PruneOutput::done()) + return Ok(SegmentOutput::done()) } } .into_inner(); @@ -94,10 +96,10 @@ impl Segment for TransactionLookup { let progress = PruneProgress::new(done, &limiter); - Ok(PruneOutput { + Ok(SegmentOutput { progress, pruned, - checkpoint: Some(PruneOutputCheckpoint { + checkpoint: Some(SegmentOutputCheckpoint { block_number: last_pruned_block, tx_number: Some(last_pruned_transaction), }), @@ -107,7 +109,7 @@ impl Segment for TransactionLookup { #[cfg(test)] mod tests { - use crate::segments::{PruneInput, PruneOutput, Segment, TransactionLookup}; + use crate::segments::{PruneInput, Segment, SegmentOutput, TransactionLookup}; use alloy_primitives::{BlockNumber, TxNumber, B256}; use assert_matches::assert_matches; use itertools::{ @@ -204,7 +206,7 @@ mod tests { assert_matches!( result, - PruneOutput {progress, pruned, checkpoint: Some(_)} + SegmentOutput {progress, pruned, checkpoint: Some(_)} if (progress, pruned) == expected_result ); diff --git a/crates/prune/types/src/lib.rs b/crates/prune/types/src/lib.rs index 82563010f1..6e06d6fc5d 100644 --- a/crates/prune/types/src/lib.rs +++ b/crates/prune/types/src/lib.rs @@ -11,12 +11,16 @@ mod checkpoint; mod limiter; mod mode; +mod pruner; mod segment; mod target; pub use checkpoint::PruneCheckpoint; pub use limiter::PruneLimiter; pub use mode::PruneMode; +pub use pruner::{ + PruneInterruptReason, PruneProgress, PrunerOutput, SegmentOutput, SegmentOutputCheckpoint, +}; pub use segment::{PrunePurpose, PruneSegment, PruneSegmentError}; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; @@ -99,66 +103,3 @@ impl ReceiptsLogPruneConfig { Ok(lowest.map(|lowest| lowest.max(pruned_block))) } } - -/// Progress of pruning. -#[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub enum PruneProgress { - /// There is more data to prune. - HasMoreData(PruneInterruptReason), - /// Pruning has been finished. - Finished, -} - -/// Reason for interrupting a prune run. -#[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub enum PruneInterruptReason { - /// Prune run timed out. - Timeout, - /// Limit on the number of deleted entries (rows in the database) per prune run was reached. - DeletedEntriesLimitReached, - /// Unknown reason for stopping prune run. - Unknown, -} - -impl PruneInterruptReason { - /// Creates new [`PruneInterruptReason`] based on the [`PruneLimiter`]. - pub fn new(limiter: &PruneLimiter) -> Self { - if limiter.is_time_limit_reached() { - Self::Timeout - } else if limiter.is_deleted_entries_limit_reached() { - Self::DeletedEntriesLimitReached - } else { - Self::Unknown - } - } - - /// Returns `true` if the reason is timeout. - pub const fn is_timeout(&self) -> bool { - matches!(self, Self::Timeout) - } - - /// Returns `true` if the reason is reaching the limit on deleted entries. - pub const fn is_entries_limit_reached(&self) -> bool { - matches!(self, Self::DeletedEntriesLimitReached) - } -} - -impl PruneProgress { - /// Creates new [`PruneProgress`]. - /// - /// If `done == true`, returns [`PruneProgress::Finished`], otherwise - /// [`PruneProgress::HasMoreData`] is returned with [`PruneInterruptReason`] according to the - /// passed limiter. - pub fn new(done: bool, limiter: &PruneLimiter) -> Self { - if done { - Self::Finished - } else { - Self::HasMoreData(PruneInterruptReason::new(limiter)) - } - } - - /// Returns `true` if prune run is finished. - pub const fn is_finished(&self) -> bool { - matches!(self, Self::Finished) - } -} diff --git a/crates/prune/types/src/pruner.rs b/crates/prune/types/src/pruner.rs new file mode 100644 index 0000000000..dbfafff639 --- /dev/null +++ b/crates/prune/types/src/pruner.rs @@ -0,0 +1,130 @@ +use alloy_primitives::{BlockNumber, TxNumber}; + +use crate::{PruneCheckpoint, PruneLimiter, PruneMode, PruneSegment}; + +/// Pruner run output. +#[derive(Debug)] +pub struct PrunerOutput { + /// Pruning progress. + pub progress: PruneProgress, + /// Pruning output for each segment. + pub segments: Vec<(PruneSegment, SegmentOutput)>, +} + +impl From for PrunerOutput { + fn from(progress: PruneProgress) -> Self { + Self { progress, segments: Vec::new() } + } +} + +/// Segment pruning output. +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub struct SegmentOutput { + /// Segment pruning progress. + pub progress: PruneProgress, + /// Number of entries pruned, i.e. deleted from the database. + pub pruned: usize, + /// Pruning checkpoint to save to database, if any. + pub checkpoint: Option, +} + +impl SegmentOutput { + /// Returns a [`SegmentOutput`] with `done = true`, `pruned = 0` and `checkpoint = None`. + /// Use when no pruning is needed. + pub const fn done() -> Self { + Self { progress: PruneProgress::Finished, pruned: 0, checkpoint: None } + } + + /// Returns a [`SegmentOutput`] with `done = false`, `pruned = 0` and `checkpoint = None`. + /// Use when pruning is needed but cannot be done. + pub const fn not_done( + reason: PruneInterruptReason, + checkpoint: Option, + ) -> Self { + Self { progress: PruneProgress::HasMoreData(reason), pruned: 0, checkpoint } + } +} + +/// Segment pruning checkpoint. +#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)] +pub struct SegmentOutputCheckpoint { + /// Highest pruned block number. If it's [None], the pruning for block `0` is not finished yet. + pub block_number: Option, + /// Highest pruned transaction number, if applicable. + pub tx_number: Option, +} + +impl SegmentOutputCheckpoint { + /// Converts [`PruneCheckpoint`] to [`SegmentOutputCheckpoint`]. + pub const fn from_prune_checkpoint(checkpoint: PruneCheckpoint) -> Self { + Self { block_number: checkpoint.block_number, tx_number: checkpoint.tx_number } + } + + /// Converts [`SegmentOutputCheckpoint`] to [`PruneCheckpoint`] with the provided [`PruneMode`] + pub const fn as_prune_checkpoint(&self, prune_mode: PruneMode) -> PruneCheckpoint { + PruneCheckpoint { block_number: self.block_number, tx_number: self.tx_number, prune_mode } + } +} + +/// Progress of pruning. +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum PruneProgress { + /// There is more data to prune. + HasMoreData(PruneInterruptReason), + /// Pruning has been finished. + Finished, +} + +/// Reason for interrupting a prune run. +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum PruneInterruptReason { + /// Prune run timed out. + Timeout, + /// Limit on the number of deleted entries (rows in the database) per prune run was reached. + DeletedEntriesLimitReached, + /// Unknown reason for stopping prune run. + Unknown, +} + +impl PruneInterruptReason { + /// Creates new [`PruneInterruptReason`] based on the [`PruneLimiter`]. + pub fn new(limiter: &PruneLimiter) -> Self { + if limiter.is_time_limit_reached() { + Self::Timeout + } else if limiter.is_deleted_entries_limit_reached() { + Self::DeletedEntriesLimitReached + } else { + Self::Unknown + } + } + + /// Returns `true` if the reason is timeout. + pub const fn is_timeout(&self) -> bool { + matches!(self, Self::Timeout) + } + + /// Returns `true` if the reason is reaching the limit on deleted entries. + pub const fn is_entries_limit_reached(&self) -> bool { + matches!(self, Self::DeletedEntriesLimitReached) + } +} + +impl PruneProgress { + /// Creates new [`PruneProgress`]. + /// + /// If `done == true`, returns [`PruneProgress::Finished`], otherwise + /// [`PruneProgress::HasMoreData`] is returned with [`PruneInterruptReason`] according to the + /// passed limiter. + pub fn new(done: bool, limiter: &PruneLimiter) -> Self { + if done { + Self::Finished + } else { + Self::HasMoreData(PruneInterruptReason::new(limiter)) + } + } + + /// Returns `true` if prune run is finished. + pub const fn is_finished(&self) -> bool { + matches!(self, Self::Finished) + } +} diff --git a/crates/stages/stages/src/stages/prune.rs b/crates/stages/stages/src/stages/prune.rs index 27d91d9399..e61acf54c1 100644 --- a/crates/stages/stages/src/stages/prune.rs +++ b/crates/stages/stages/src/stages/prune.rs @@ -46,9 +46,10 @@ impl Stage for PruneStage { .build(provider.static_file_provider().clone()); let result = pruner.run(provider, input.target())?; - if result.is_finished() { + if result.progress.is_finished() { Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true }) } else { + info!(target: "sync::stages::prune::exec", segments = ?result.segments, "Pruner has more data to prune"); // We cannot set the checkpoint yet, because prune segments may have different highest // pruned block numbers Ok(ExecOutput { checkpoint: input.checkpoint(), done: false })