feat(pruner, stages): logs for Prune stage (#9520)

This commit is contained in:
Alexey Shekhirin
2024-07-15 17:47:17 +01:00
committed by GitHub
parent 5e07dee4e3
commit 068220bb0a
17 changed files with 260 additions and 228 deletions

View File

@@ -8,7 +8,7 @@ use reth_provider::{
bundle_state::HashedStateChanges, BlockWriter, HistoryWriter, OriginalValuesKnown,
ProviderFactory, StageCheckpointWriter, StateWriter,
};
use reth_prune::{PruneProgress, Pruner};
use reth_prune::{Pruner, PrunerOutput};
use reth_stages_types::{StageCheckpoint, StageId};
use std::sync::mpsc::{Receiver, SendError, Sender};
use tokio::sync::oneshot;
@@ -118,7 +118,7 @@ impl<DB: Database> DatabaseService<DB> {
/// Prunes block data before the given block hash according to the configured prune
/// configuration.
fn prune_before(&mut self, block_num: u64) -> PruneProgress {
fn prune_before(&mut self, block_num: u64) -> PrunerOutput {
// TODO: doing this properly depends on pruner segment changes
self.pruner.run(block_num).expect("todo: handle errors")
}
@@ -201,7 +201,7 @@ pub enum DatabaseAction {
/// Prune associated block data before the given block number, according to already-configured
/// prune modes.
PruneBefore((u64, oneshot::Sender<PruneProgress>)),
PruneBefore((u64, oneshot::Sender<PrunerOutput>)),
}
/// A handle to the database service
@@ -246,7 +246,7 @@ impl DatabaseServiceHandle {
/// Tells the database service to remove block data before the given hash, according to the
/// configured prune config.
pub async fn prune_before(&self, block_num: u64) -> PruneProgress {
pub async fn prune_before(&self, block_num: u64) -> PrunerOutput {
let (tx, rx) = oneshot::channel();
self.sender
.send(DatabaseAction::PruneBefore((block_num, tx)))

View File

@@ -8,7 +8,7 @@ use crate::{
use reth_db::Database;
use reth_primitives::{SealedBlock, B256, U256};
use reth_provider::ProviderFactory;
use reth_prune::{PruneProgress, Pruner};
use reth_prune::{Pruner, PrunerOutput};
use std::sync::{
mpsc::{SendError, Sender},
Arc,
@@ -36,7 +36,7 @@ pub enum PersistenceAction {
/// Prune associated block data before the given block number, according to already-configured
/// prune modes.
PruneBefore((u64, oneshot::Sender<PruneProgress>)),
PruneBefore((u64, oneshot::Sender<PrunerOutput>)),
}
/// An error type for when there is a [`SendError`] while sending an action to one of the services.
@@ -172,7 +172,7 @@ impl PersistenceHandle {
/// Tells the persistence service to remove block data before the given hash, according to the
/// configured prune config.
pub async fn prune_before(&self, block_num: u64) -> PruneProgress {
pub async fn prune_before(&self, block_num: u64) -> PrunerOutput {
let (tx, rx) = oneshot::channel();
self.send_action(PersistenceAction::PruneBefore((block_num, tx)))
.expect("should be able to send");

View File

@@ -8,14 +8,14 @@ use alloy_primitives::BlockNumber;
use reth_db_api::database::Database;
use reth_exex_types::FinishedExExHeight;
use reth_provider::{DatabaseProviderRW, ProviderFactory, PruneCheckpointReader};
use reth_prune_types::{PruneLimiter, PruneProgress, PruneSegment};
use reth_prune_types::{PruneLimiter, PruneProgress, PruneSegment, PrunerOutput};
use reth_tokio_util::{EventSender, EventStream};
use std::time::{Duration, Instant};
use tokio::sync::watch;
use tracing::debug;
/// Result of [`Pruner::run`] execution.
pub type PrunerResult = Result<PruneProgress, PrunerError>;
pub type PrunerResult = Result<PrunerOutput, PrunerError>;
/// The pruner type itself with the result of [`Pruner::run`]
pub type PrunerWithResult<S, DB> = (Pruner<S, DB>, PrunerResult);
@@ -107,13 +107,13 @@ impl<DB: Database, S> Pruner<DB, S> {
let Some(tip_block_number) =
self.adjust_tip_block_number_to_finished_exex_height(tip_block_number)
else {
return Ok(PruneProgress::Finished)
return Ok(PruneProgress::Finished.into())
};
if tip_block_number == 0 {
self.previous_tip_block_number = Some(tip_block_number);
debug!(target: "pruner", %tip_block_number, "Nothing to prune yet");
return Ok(PruneProgress::Finished)
return Ok(PruneProgress::Finished.into())
}
self.event_sender.notify(PrunerEvent::Started { tip_block_number });
@@ -126,7 +126,7 @@ impl<DB: Database, S> Pruner<DB, S> {
limiter = limiter.set_time_limit(timeout);
};
let (stats, deleted_entries, progress) =
let (stats, deleted_entries, output) =
self.prune_segments(provider, tip_block_number, &mut limiter)?;
self.previous_tip_block_number = Some(tip_block_number);
@@ -134,7 +134,7 @@ impl<DB: Database, S> Pruner<DB, S> {
let elapsed = start.elapsed();
self.metrics.duration_seconds.record(elapsed);
let message = match progress {
let message = match output.progress {
PruneProgress::HasMoreData(_) => "Pruner interrupted and has more data to prune",
PruneProgress::Finished => "Pruner finished",
};
@@ -145,14 +145,14 @@ impl<DB: Database, S> Pruner<DB, S> {
?elapsed,
?deleted_entries,
?limiter,
?progress,
?output,
?stats,
"{message}",
);
self.event_sender.notify(PrunerEvent::Finished { tip_block_number, elapsed, stats });
Ok(progress)
Ok(output)
}
/// Prunes the segments that the [Pruner] was initialized with, and the segments that needs to
@@ -165,10 +165,13 @@ impl<DB: Database, S> Pruner<DB, S> {
provider: &DatabaseProviderRW<DB>,
tip_block_number: BlockNumber,
limiter: &mut PruneLimiter,
) -> Result<(PrunerStats, usize, PruneProgress), PrunerError> {
) -> Result<(PrunerStats, usize, PrunerOutput), PrunerError> {
let mut stats = PrunerStats::new();
let mut pruned = 0;
let mut progress = PruneProgress::Finished;
let mut output = PrunerOutput {
progress: PruneProgress::Finished,
segments: Vec::with_capacity(self.segments.len()),
};
for segment in &self.segments {
if limiter.is_limit_reached() {
@@ -194,11 +197,11 @@ impl<DB: Database, S> Pruner<DB, S> {
let segment_start = Instant::now();
let previous_checkpoint = provider.get_prune_checkpoint(segment.segment())?;
let output = segment.prune(
let segment_output = segment.prune(
provider,
PruneInput { previous_checkpoint, to_block, limiter: limiter.clone() },
)?;
if let Some(checkpoint) = output.checkpoint {
if let Some(checkpoint) = segment_output.checkpoint {
segment
.save_checkpoint(provider, checkpoint.as_prune_checkpoint(prune_mode))?;
}
@@ -207,7 +210,7 @@ impl<DB: Database, S> Pruner<DB, S> {
.duration_seconds
.record(segment_start.elapsed());
if let Some(highest_pruned_block) =
output.checkpoint.and_then(|checkpoint| checkpoint.block_number)
segment_output.checkpoint.and_then(|checkpoint| checkpoint.block_number)
{
self.metrics
.get_prune_segment_metrics(segment.segment())
@@ -215,7 +218,8 @@ impl<DB: Database, S> Pruner<DB, S> {
.set(highest_pruned_block as f64);
}
progress = output.progress;
output.progress = segment_output.progress;
output.segments.push((segment.segment(), segment_output));
debug!(
target: "pruner",
@@ -223,21 +227,21 @@ impl<DB: Database, S> Pruner<DB, S> {
purpose = ?segment.purpose(),
%to_block,
?prune_mode,
%output.pruned,
%segment_output.pruned,
"Segment pruning finished"
);
if output.pruned > 0 {
limiter.increment_deleted_entries_count_by(output.pruned);
pruned += output.pruned;
stats.push((segment.segment(), output.pruned, output.progress));
if segment_output.pruned > 0 {
limiter.increment_deleted_entries_count_by(segment_output.pruned);
pruned += segment_output.pruned;
stats.push((segment.segment(), segment_output.pruned, segment_output.progress));
}
} else {
debug!(target: "pruner", segment = ?segment.segment(), purpose = ?segment.purpose(), "Nothing to prune for the segment");
}
}
Ok((stats, pruned, progress))
Ok((stats, pruned, output))
}
/// Returns `true` if the pruning is needed at the provided tip block number.

View File

@@ -10,8 +10,7 @@ use reth_provider::{
errors::provider::ProviderResult, BlockReader, DatabaseProviderRW, PruneCheckpointWriter,
};
use reth_prune_types::{
PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, PrunePurpose,
PruneSegment,
PruneCheckpoint, PruneLimiter, PruneMode, PrunePurpose, PruneSegment, SegmentOutput,
};
pub use set::SegmentSet;
pub use static_file::{
@@ -29,9 +28,9 @@ pub use user::{
///
/// Segments are called from [Pruner](crate::Pruner) with the following lifecycle:
/// 1. Call [`Segment::prune`] with `delete_limit` of [`PruneInput`].
/// 2. If [`Segment::prune`] returned a [Some] in `checkpoint` of [`PruneOutput`], call
/// 2. If [`Segment::prune`] returned a [Some] in `checkpoint` of [`SegmentOutput`], call
/// [`Segment::save_checkpoint`].
/// 3. Subtract `pruned` of [`PruneOutput`] from `delete_limit` of next [`PruneInput`].
/// 3. Subtract `pruned` of [`SegmentOutput`] from `delete_limit` of next [`PruneInput`].
pub trait Segment<DB: Database>: Debug + Send + Sync {
/// Segment of data that's pruned.
fn segment(&self) -> PruneSegment;
@@ -47,7 +46,7 @@ pub trait Segment<DB: Database>: Debug + Send + Sync {
&self,
provider: &DatabaseProviderRW<DB>,
input: PruneInput,
) -> Result<PruneOutput, PrunerError>;
) -> Result<SegmentOutput, PrunerError>;
/// Save checkpoint for [`Self::segment`] to the database.
fn save_checkpoint(
@@ -148,51 +147,3 @@ impl PruneInput {
.unwrap_or(0)
}
}
/// Segment pruning output, see [`Segment::prune`].
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct PruneOutput {
pub(crate) progress: PruneProgress,
/// Number of entries pruned, i.e. deleted from the database.
pub(crate) pruned: usize,
/// Pruning checkpoint to save to database, if any.
pub(crate) checkpoint: Option<PruneOutputCheckpoint>,
}
impl PruneOutput {
/// Returns a [`PruneOutput`] with `done = true`, `pruned = 0` and `checkpoint = None`.
/// Use when no pruning is needed.
pub(crate) const fn done() -> Self {
Self { progress: PruneProgress::Finished, pruned: 0, checkpoint: None }
}
/// Returns a [`PruneOutput`] with `done = false`, `pruned = 0` and `checkpoint = None`.
/// Use when pruning is needed but cannot be done.
pub(crate) const fn not_done(
reason: PruneInterruptReason,
checkpoint: Option<PruneOutputCheckpoint>,
) -> Self {
Self { progress: PruneProgress::HasMoreData(reason), pruned: 0, checkpoint }
}
}
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
pub(crate) struct PruneOutputCheckpoint {
/// Highest pruned block number. If it's [None], the pruning for block `0` is not finished yet.
pub(crate) block_number: Option<BlockNumber>,
/// Highest pruned transaction number, if applicable.
pub(crate) tx_number: Option<TxNumber>,
}
impl PruneOutputCheckpoint {
/// Converts [`PruneOutputCheckpoint`] to [`PruneCheckpoint`] with the provided [`PruneMode`]
pub(crate) const fn as_prune_checkpoint(&self, prune_mode: PruneMode) -> PruneCheckpoint {
PruneCheckpoint { block_number: self.block_number, tx_number: self.tx_number, prune_mode }
}
}
impl From<PruneCheckpoint> for PruneOutputCheckpoint {
fn from(checkpoint: PruneCheckpoint) -> Self {
Self { block_number: checkpoint.block_number, tx_number: checkpoint.tx_number }
}
}

View File

@@ -5,28 +5,27 @@
//! - [`crate::segments::static_file::Receipts`] is responsible for pruning receipts on an archive
//! node after static file producer has finished
use crate::{
segments::{PruneInput, PruneOutput, PruneOutputCheckpoint},
PrunerError,
};
use crate::{segments::PruneInput, PrunerError};
use reth_db::tables;
use reth_db_api::database::Database;
use reth_provider::{
errors::provider::ProviderResult, DatabaseProviderRW, PruneCheckpointWriter,
TransactionsProvider,
};
use reth_prune_types::{PruneCheckpoint, PruneProgress, PruneSegment};
use reth_prune_types::{
PruneCheckpoint, PruneProgress, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use tracing::trace;
pub(crate) fn prune<DB: Database>(
provider: &DatabaseProviderRW<DB>,
input: PruneInput,
) -> Result<PruneOutput, PrunerError> {
) -> Result<SegmentOutput, PrunerError> {
let tx_range = match input.get_next_tx_num_range(provider)? {
Some(range) => range,
None => {
trace!(target: "pruner", "No receipts to prune");
return Ok(PruneOutput::done())
return Ok(SegmentOutput::done())
}
};
let tx_range_end = *tx_range.end();
@@ -51,10 +50,10 @@ pub(crate) fn prune<DB: Database>(
let progress = PruneProgress::new(done, &limiter);
Ok(PruneOutput {
Ok(SegmentOutput {
progress,
pruned,
checkpoint: Some(PruneOutputCheckpoint {
checkpoint: Some(SegmentOutputCheckpoint {
block_number: last_pruned_block,
tx_number: Some(last_pruned_transaction),
}),
@@ -76,7 +75,7 @@ pub(crate) fn save_checkpoint<DB: Database>(
#[cfg(test)]
mod tests {
use crate::segments::{PruneInput, PruneOutput};
use crate::segments::{PruneInput, SegmentOutput};
use alloy_primitives::{BlockNumber, TxNumber, B256};
use assert_matches::assert_matches;
use itertools::{
@@ -162,7 +161,7 @@ mod tests {
assert_matches!(
result,
PruneOutput {progress, pruned, checkpoint: Some(_)}
SegmentOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);

View File

@@ -1,7 +1,7 @@
use std::num::NonZeroUsize;
use crate::{
segments::{PruneInput, PruneOutput, PruneOutputCheckpoint, Segment},
segments::{PruneInput, Segment},
PrunerError,
};
use alloy_primitives::BlockNumber;
@@ -13,7 +13,10 @@ use reth_db::{
transaction::DbTxMut,
};
use reth_provider::{providers::StaticFileProvider, DatabaseProviderRW};
use reth_prune_types::{PruneLimiter, PruneMode, PruneProgress, PrunePurpose, PruneSegment};
use reth_prune_types::{
PruneLimiter, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput,
SegmentOutputCheckpoint,
};
use reth_static_file_types::StaticFileSegment;
use tracing::trace;
@@ -50,12 +53,12 @@ impl<DB: Database> Segment<DB> for Headers {
&self,
provider: &DatabaseProviderRW<DB>,
input: PruneInput,
) -> Result<PruneOutput, PrunerError> {
) -> Result<SegmentOutput, PrunerError> {
let (block_range_start, block_range_end) = match input.get_next_block_range() {
Some(range) => (*range.start(), *range.end()),
None => {
trace!(target: "pruner", "No headers to prune");
return Ok(PruneOutput::done())
return Ok(SegmentOutput::done())
}
};
@@ -93,10 +96,10 @@ impl<DB: Database> Segment<DB> for Headers {
let done = last_pruned_block.map_or(false, |block| block == block_range_end);
let progress = PruneProgress::new(done, &limiter);
Ok(PruneOutput {
Ok(SegmentOutput {
progress,
pruned,
checkpoint: Some(PruneOutputCheckpoint {
checkpoint: Some(SegmentOutputCheckpoint {
block_number: last_pruned_block,
tx_number: None,
}),
@@ -193,8 +196,7 @@ where
#[cfg(test)]
mod tests {
use crate::segments::{
static_file::headers::HEADER_TABLES_TO_PRUNE, PruneInput, PruneOutput,
PruneOutputCheckpoint, Segment,
static_file::headers::HEADER_TABLES_TO_PRUNE, PruneInput, Segment, SegmentOutput,
};
use alloy_primitives::{BlockNumber, B256, U256};
use assert_matches::assert_matches;
@@ -202,7 +204,8 @@ mod tests {
use reth_db_api::transaction::DbTx;
use reth_provider::{PruneCheckpointReader, PruneCheckpointWriter, StaticFileProviderFactory};
use reth_prune_types::{
PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, PruneSegment,
PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress,
PruneSegment, SegmentOutputCheckpoint,
};
use reth_stages::test_utils::TestStageDB;
use reth_testing_utils::{generators, generators::random_header_range};
@@ -258,12 +261,12 @@ mod tests {
expected_prune_progress=?expected_result.0,
expected_pruned=?expected_result.1,
result=?result,
"PruneOutput"
"SegmentOutput"
);
assert_matches!(
result,
PruneOutput {progress, pruned, checkpoint: Some(_)}
SegmentOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);
provider
@@ -327,9 +330,9 @@ mod tests {
let result = segment.prune(&provider, input).unwrap();
assert_eq!(
result,
PruneOutput::not_done(
SegmentOutput::not_done(
PruneInterruptReason::DeletedEntriesLimitReached,
Some(PruneOutputCheckpoint::default())
Some(SegmentOutputCheckpoint::default())
)
);
}

View File

@@ -1,12 +1,12 @@
use crate::{
segments::{PruneInput, PruneOutput, Segment},
segments::{PruneInput, Segment},
PrunerError,
};
use reth_db_api::database::Database;
use reth_provider::{
errors::provider::ProviderResult, providers::StaticFileProvider, DatabaseProviderRW,
};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, SegmentOutput};
use reth_static_file_types::StaticFileSegment;
#[derive(Debug)]
@@ -39,7 +39,7 @@ impl<DB: Database> Segment<DB> for Receipts {
&self,
provider: &DatabaseProviderRW<DB>,
input: PruneInput,
) -> Result<PruneOutput, PrunerError> {
) -> Result<SegmentOutput, PrunerError> {
crate::segments::receipts::prune(provider, input)
}

View File

@@ -1,11 +1,13 @@
use crate::{
segments::{PruneInput, PruneOutput, PruneOutputCheckpoint, Segment},
segments::{PruneInput, Segment},
PrunerError,
};
use reth_db::tables;
use reth_db_api::database::Database;
use reth_provider::{providers::StaticFileProvider, DatabaseProviderRW, TransactionsProvider};
use reth_prune_types::{PruneMode, PruneProgress, PrunePurpose, PruneSegment};
use reth_prune_types::{
PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use reth_static_file_types::StaticFileSegment;
use tracing::trace;
@@ -39,12 +41,12 @@ impl<DB: Database> Segment<DB> for Transactions {
&self,
provider: &DatabaseProviderRW<DB>,
input: PruneInput,
) -> Result<PruneOutput, PrunerError> {
) -> Result<SegmentOutput, PrunerError> {
let tx_range = match input.get_next_tx_num_range(provider)? {
Some(range) => range,
None => {
trace!(target: "pruner", "No transactions to prune");
return Ok(PruneOutput::done())
return Ok(SegmentOutput::done())
}
};
@@ -68,10 +70,10 @@ impl<DB: Database> Segment<DB> for Transactions {
let progress = PruneProgress::new(done, &limiter);
Ok(PruneOutput {
Ok(SegmentOutput {
progress,
pruned,
checkpoint: Some(PruneOutputCheckpoint {
checkpoint: Some(SegmentOutputCheckpoint {
block_number: last_pruned_block,
tx_number: Some(last_pruned_transaction),
}),
@@ -81,7 +83,7 @@ impl<DB: Database> Segment<DB> for Transactions {
#[cfg(test)]
mod tests {
use crate::segments::{PruneInput, PruneOutput, Segment};
use crate::segments::{PruneInput, Segment};
use alloy_primitives::{BlockNumber, TxNumber, B256};
use assert_matches::assert_matches;
use itertools::{
@@ -91,7 +93,8 @@ mod tests {
use reth_db::tables;
use reth_provider::{PruneCheckpointReader, PruneCheckpointWriter, StaticFileProviderFactory};
use reth_prune_types::{
PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, PruneSegment,
PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress,
PruneSegment, SegmentOutput,
};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_testing_utils::{generators, generators::random_block_range};
@@ -140,7 +143,7 @@ mod tests {
assert_matches!(
result,
PruneOutput {progress, pruned, checkpoint: Some(_)}
SegmentOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);

View File

@@ -1,8 +1,5 @@
use crate::{
segments::{
user::history::prune_history_indices, PruneInput, PruneOutput, PruneOutputCheckpoint,
Segment,
},
segments::{user::history::prune_history_indices, PruneInput, Segment},
PrunerError,
};
use itertools::Itertools;
@@ -10,7 +7,8 @@ use reth_db::tables;
use reth_db_api::{database::Database, models::ShardedKey};
use reth_provider::DatabaseProviderRW;
use reth_prune_types::{
PruneInterruptReason, PruneMode, PruneProgress, PrunePurpose, PruneSegment,
PruneInterruptReason, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput,
SegmentOutputCheckpoint,
};
use rustc_hash::FxHashMap;
use tracing::{instrument, trace};
@@ -50,12 +48,12 @@ impl<DB: Database> Segment<DB> for AccountHistory {
&self,
provider: &DatabaseProviderRW<DB>,
input: PruneInput,
) -> Result<PruneOutput, PrunerError> {
) -> Result<SegmentOutput, PrunerError> {
let range = match input.get_next_block_range() {
Some(range) => range,
None => {
trace!(target: "pruner", "No account history to prune");
return Ok(PruneOutput::done())
return Ok(SegmentOutput::done())
}
};
let range_end = *range.end();
@@ -66,9 +64,9 @@ impl<DB: Database> Segment<DB> for AccountHistory {
input.limiter
};
if limiter.is_limit_reached() {
return Ok(PruneOutput::not_done(
return Ok(SegmentOutput::not_done(
PruneInterruptReason::new(&limiter),
input.previous_checkpoint.map(|checkpoint| checkpoint.into()),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
@@ -117,10 +115,10 @@ impl<DB: Database> Segment<DB> for AccountHistory {
let progress = PruneProgress::new(done, &limiter);
Ok(PruneOutput {
Ok(SegmentOutput {
progress,
pruned: pruned_changesets + outcomes.deleted,
checkpoint: Some(PruneOutputCheckpoint {
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
@@ -132,7 +130,7 @@ impl<DB: Database> Segment<DB> for AccountHistory {
mod tests {
use crate::segments::{
user::account_history::ACCOUNT_HISTORY_TABLES_TO_PRUNE, AccountHistory, PruneInput,
PruneOutput, Segment,
Segment, SegmentOutput,
};
use alloy_primitives::{BlockNumber, B256};
use assert_matches::assert_matches;
@@ -208,7 +206,7 @@ mod tests {
assert_matches!(
result,
PruneOutput {progress, pruned, checkpoint: Some(_)}
SegmentOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);

View File

@@ -1,10 +1,10 @@
use crate::{
segments::{PruneInput, PruneOutput, Segment},
segments::{PruneInput, Segment},
PrunerError,
};
use reth_db_api::database::Database;
use reth_provider::{errors::provider::ProviderResult, DatabaseProviderRW};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, SegmentOutput};
use tracing::instrument;
#[derive(Debug)]
@@ -36,7 +36,7 @@ impl<DB: Database> Segment<DB> for Receipts {
&self,
provider: &DatabaseProviderRW<DB>,
input: PruneInput,
) -> Result<PruneOutput, PrunerError> {
) -> Result<SegmentOutput, PrunerError> {
crate::segments::receipts::prune(provider, input)
}

View File

@@ -1,5 +1,5 @@
use crate::{
segments::{PruneInput, PruneOutput, Segment},
segments::{PruneInput, Segment},
PrunerError,
};
use reth_db::tables;
@@ -7,7 +7,7 @@ use reth_db_api::database::Database;
use reth_provider::{BlockReader, DatabaseProviderRW, PruneCheckpointWriter, TransactionsProvider};
use reth_prune_types::{
PruneCheckpoint, PruneMode, PruneProgress, PrunePurpose, PruneSegment, ReceiptsLogPruneConfig,
MINIMUM_PRUNING_DISTANCE,
SegmentOutput, MINIMUM_PRUNING_DISTANCE,
};
use tracing::{instrument, trace};
@@ -40,7 +40,7 @@ impl<DB: Database> Segment<DB> for ReceiptsByLogs {
&self,
provider: &DatabaseProviderRW<DB>,
input: PruneInput,
) -> Result<PruneOutput, PrunerError> {
) -> Result<SegmentOutput, PrunerError> {
// Contract log filtering removes every receipt possible except the ones in the list. So,
// for the other receipts it's as if they had a `PruneMode::Distance()` of
// `MINIMUM_PRUNING_DISTANCE`.
@@ -213,7 +213,7 @@ impl<DB: Database> Segment<DB> for ReceiptsByLogs {
let progress = PruneProgress::new(done, &limiter);
Ok(PruneOutput { progress, pruned, checkpoint: None })
Ok(SegmentOutput { progress, pruned, checkpoint: None })
}
}

View File

@@ -1,11 +1,13 @@
use crate::{
segments::{PruneInput, PruneOutput, PruneOutputCheckpoint, Segment},
segments::{PruneInput, Segment},
PrunerError,
};
use reth_db::tables;
use reth_db_api::database::Database;
use reth_provider::{DatabaseProviderRW, TransactionsProvider};
use reth_prune_types::{PruneMode, PruneProgress, PrunePurpose, PruneSegment};
use reth_prune_types::{
PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use tracing::{instrument, trace};
#[derive(Debug)]
@@ -37,12 +39,12 @@ impl<DB: Database> Segment<DB> for SenderRecovery {
&self,
provider: &DatabaseProviderRW<DB>,
input: PruneInput,
) -> Result<PruneOutput, PrunerError> {
) -> Result<SegmentOutput, PrunerError> {
let tx_range = match input.get_next_tx_num_range(provider)? {
Some(range) => range,
None => {
trace!(target: "pruner", "No transaction senders to prune");
return Ok(PruneOutput::done())
return Ok(SegmentOutput::done())
}
};
let tx_range_end = *tx_range.end();
@@ -67,10 +69,10 @@ impl<DB: Database> Segment<DB> for SenderRecovery {
let progress = PruneProgress::new(done, &limiter);
Ok(PruneOutput {
Ok(SegmentOutput {
progress,
pruned,
checkpoint: Some(PruneOutputCheckpoint {
checkpoint: Some(SegmentOutputCheckpoint {
block_number: last_pruned_block,
tx_number: Some(last_pruned_transaction),
}),
@@ -80,7 +82,7 @@ impl<DB: Database> Segment<DB> for SenderRecovery {
#[cfg(test)]
mod tests {
use crate::segments::{PruneInput, PruneOutput, Segment, SenderRecovery};
use crate::segments::{PruneInput, Segment, SegmentOutput, SenderRecovery};
use alloy_primitives::{BlockNumber, TxNumber, B256};
use assert_matches::assert_matches;
use itertools::{
@@ -179,7 +181,7 @@ mod tests {
assert_matches!(
result,
PruneOutput {progress, pruned, checkpoint: Some(_)}
SegmentOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);

View File

@@ -1,8 +1,5 @@
use crate::{
segments::{
user::history::prune_history_indices, PruneInput, PruneOutput, PruneOutputCheckpoint,
Segment,
},
segments::{user::history::prune_history_indices, PruneInput, Segment, SegmentOutput},
PrunerError,
};
use itertools::Itertools;
@@ -14,6 +11,7 @@ use reth_db_api::{
use reth_provider::DatabaseProviderRW;
use reth_prune_types::{
PruneInterruptReason, PruneMode, PruneProgress, PrunePurpose, PruneSegment,
SegmentOutputCheckpoint,
};
use rustc_hash::FxHashMap;
use tracing::{instrument, trace};
@@ -53,12 +51,12 @@ impl<DB: Database> Segment<DB> for StorageHistory {
&self,
provider: &DatabaseProviderRW<DB>,
input: PruneInput,
) -> Result<PruneOutput, PrunerError> {
) -> Result<SegmentOutput, PrunerError> {
let range = match input.get_next_block_range() {
Some(range) => range,
None => {
trace!(target: "pruner", "No storage history to prune");
return Ok(PruneOutput::done())
return Ok(SegmentOutput::done())
}
};
let range_end = *range.end();
@@ -69,9 +67,9 @@ impl<DB: Database> Segment<DB> for StorageHistory {
input.limiter
};
if limiter.is_limit_reached() {
return Ok(PruneOutput::not_done(
return Ok(SegmentOutput::not_done(
PruneInterruptReason::new(&limiter),
input.previous_checkpoint.map(|checkpoint| checkpoint.into()),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
@@ -125,10 +123,10 @@ impl<DB: Database> Segment<DB> for StorageHistory {
let progress = PruneProgress::new(done, &limiter);
Ok(PruneOutput {
Ok(SegmentOutput {
progress,
pruned: pruned_changesets + outcomes.deleted,
checkpoint: Some(PruneOutputCheckpoint {
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
@@ -139,7 +137,7 @@ impl<DB: Database> Segment<DB> for StorageHistory {
#[cfg(test)]
mod tests {
use crate::segments::{
user::storage_history::STORAGE_HISTORY_TABLES_TO_PRUNE, PruneInput, PruneOutput, Segment,
user::storage_history::STORAGE_HISTORY_TABLES_TO_PRUNE, PruneInput, Segment, SegmentOutput,
StorageHistory,
};
use alloy_primitives::{BlockNumber, B256};
@@ -215,7 +213,7 @@ mod tests {
assert_matches!(
result,
PruneOutput {progress, pruned, checkpoint: Some(_)}
SegmentOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);

View File

@@ -1,12 +1,14 @@
use crate::{
segments::{PruneInput, PruneOutput, PruneOutputCheckpoint, Segment},
segments::{PruneInput, Segment, SegmentOutput},
PrunerError,
};
use rayon::prelude::*;
use reth_db::tables;
use reth_db_api::database::Database;
use reth_provider::{DatabaseProviderRW, TransactionsProvider};
use reth_prune_types::{PruneMode, PruneProgress, PrunePurpose, PruneSegment};
use reth_prune_types::{
PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutputCheckpoint,
};
use tracing::{instrument, trace};
#[derive(Debug)]
@@ -38,12 +40,12 @@ impl<DB: Database> Segment<DB> for TransactionLookup {
&self,
provider: &DatabaseProviderRW<DB>,
input: PruneInput,
) -> Result<PruneOutput, PrunerError> {
) -> Result<SegmentOutput, PrunerError> {
let (start, end) = match input.get_next_tx_num_range(provider)? {
Some(range) => range,
None => {
trace!(target: "pruner", "No transaction lookup entries to prune");
return Ok(PruneOutput::done())
return Ok(SegmentOutput::done())
}
}
.into_inner();
@@ -94,10 +96,10 @@ impl<DB: Database> Segment<DB> for TransactionLookup {
let progress = PruneProgress::new(done, &limiter);
Ok(PruneOutput {
Ok(SegmentOutput {
progress,
pruned,
checkpoint: Some(PruneOutputCheckpoint {
checkpoint: Some(SegmentOutputCheckpoint {
block_number: last_pruned_block,
tx_number: Some(last_pruned_transaction),
}),
@@ -107,7 +109,7 @@ impl<DB: Database> Segment<DB> for TransactionLookup {
#[cfg(test)]
mod tests {
use crate::segments::{PruneInput, PruneOutput, Segment, TransactionLookup};
use crate::segments::{PruneInput, Segment, SegmentOutput, TransactionLookup};
use alloy_primitives::{BlockNumber, TxNumber, B256};
use assert_matches::assert_matches;
use itertools::{
@@ -204,7 +206,7 @@ mod tests {
assert_matches!(
result,
PruneOutput {progress, pruned, checkpoint: Some(_)}
SegmentOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);

View File

@@ -11,12 +11,16 @@
mod checkpoint;
mod limiter;
mod mode;
mod pruner;
mod segment;
mod target;
pub use checkpoint::PruneCheckpoint;
pub use limiter::PruneLimiter;
pub use mode::PruneMode;
pub use pruner::{
PruneInterruptReason, PruneProgress, PrunerOutput, SegmentOutput, SegmentOutputCheckpoint,
};
pub use segment::{PrunePurpose, PruneSegment, PruneSegmentError};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
@@ -99,66 +103,3 @@ impl ReceiptsLogPruneConfig {
Ok(lowest.map(|lowest| lowest.max(pruned_block)))
}
}
/// Progress of pruning.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum PruneProgress {
/// There is more data to prune.
HasMoreData(PruneInterruptReason),
/// Pruning has been finished.
Finished,
}
/// Reason for interrupting a prune run.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum PruneInterruptReason {
/// Prune run timed out.
Timeout,
/// Limit on the number of deleted entries (rows in the database) per prune run was reached.
DeletedEntriesLimitReached,
/// Unknown reason for stopping prune run.
Unknown,
}
impl PruneInterruptReason {
/// Creates new [`PruneInterruptReason`] based on the [`PruneLimiter`].
pub fn new(limiter: &PruneLimiter) -> Self {
if limiter.is_time_limit_reached() {
Self::Timeout
} else if limiter.is_deleted_entries_limit_reached() {
Self::DeletedEntriesLimitReached
} else {
Self::Unknown
}
}
/// Returns `true` if the reason is timeout.
pub const fn is_timeout(&self) -> bool {
matches!(self, Self::Timeout)
}
/// Returns `true` if the reason is reaching the limit on deleted entries.
pub const fn is_entries_limit_reached(&self) -> bool {
matches!(self, Self::DeletedEntriesLimitReached)
}
}
impl PruneProgress {
/// Creates new [`PruneProgress`].
///
/// If `done == true`, returns [`PruneProgress::Finished`], otherwise
/// [`PruneProgress::HasMoreData`] is returned with [`PruneInterruptReason`] according to the
/// passed limiter.
pub fn new(done: bool, limiter: &PruneLimiter) -> Self {
if done {
Self::Finished
} else {
Self::HasMoreData(PruneInterruptReason::new(limiter))
}
}
/// Returns `true` if prune run is finished.
pub const fn is_finished(&self) -> bool {
matches!(self, Self::Finished)
}
}

View File

@@ -0,0 +1,130 @@
use alloy_primitives::{BlockNumber, TxNumber};
use crate::{PruneCheckpoint, PruneLimiter, PruneMode, PruneSegment};
/// Pruner run output.
#[derive(Debug)]
pub struct PrunerOutput {
/// Pruning progress.
pub progress: PruneProgress,
/// Pruning output for each segment.
pub segments: Vec<(PruneSegment, SegmentOutput)>,
}
impl From<PruneProgress> for PrunerOutput {
fn from(progress: PruneProgress) -> Self {
Self { progress, segments: Vec::new() }
}
}
/// Segment pruning output.
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct SegmentOutput {
/// Segment pruning progress.
pub progress: PruneProgress,
/// Number of entries pruned, i.e. deleted from the database.
pub pruned: usize,
/// Pruning checkpoint to save to database, if any.
pub checkpoint: Option<SegmentOutputCheckpoint>,
}
impl SegmentOutput {
/// Returns a [`SegmentOutput`] with `done = true`, `pruned = 0` and `checkpoint = None`.
/// Use when no pruning is needed.
pub const fn done() -> Self {
Self { progress: PruneProgress::Finished, pruned: 0, checkpoint: None }
}
/// Returns a [`SegmentOutput`] with `done = false`, `pruned = 0` and `checkpoint = None`.
/// Use when pruning is needed but cannot be done.
pub const fn not_done(
reason: PruneInterruptReason,
checkpoint: Option<SegmentOutputCheckpoint>,
) -> Self {
Self { progress: PruneProgress::HasMoreData(reason), pruned: 0, checkpoint }
}
}
/// Segment pruning checkpoint.
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
pub struct SegmentOutputCheckpoint {
/// Highest pruned block number. If it's [None], the pruning for block `0` is not finished yet.
pub block_number: Option<BlockNumber>,
/// Highest pruned transaction number, if applicable.
pub tx_number: Option<TxNumber>,
}
impl SegmentOutputCheckpoint {
/// Converts [`PruneCheckpoint`] to [`SegmentOutputCheckpoint`].
pub const fn from_prune_checkpoint(checkpoint: PruneCheckpoint) -> Self {
Self { block_number: checkpoint.block_number, tx_number: checkpoint.tx_number }
}
/// Converts [`SegmentOutputCheckpoint`] to [`PruneCheckpoint`] with the provided [`PruneMode`]
pub const fn as_prune_checkpoint(&self, prune_mode: PruneMode) -> PruneCheckpoint {
PruneCheckpoint { block_number: self.block_number, tx_number: self.tx_number, prune_mode }
}
}
/// Progress of pruning.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum PruneProgress {
/// There is more data to prune.
HasMoreData(PruneInterruptReason),
/// Pruning has been finished.
Finished,
}
/// Reason for interrupting a prune run.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum PruneInterruptReason {
/// Prune run timed out.
Timeout,
/// Limit on the number of deleted entries (rows in the database) per prune run was reached.
DeletedEntriesLimitReached,
/// Unknown reason for stopping prune run.
Unknown,
}
impl PruneInterruptReason {
/// Creates new [`PruneInterruptReason`] based on the [`PruneLimiter`].
pub fn new(limiter: &PruneLimiter) -> Self {
if limiter.is_time_limit_reached() {
Self::Timeout
} else if limiter.is_deleted_entries_limit_reached() {
Self::DeletedEntriesLimitReached
} else {
Self::Unknown
}
}
/// Returns `true` if the reason is timeout.
pub const fn is_timeout(&self) -> bool {
matches!(self, Self::Timeout)
}
/// Returns `true` if the reason is reaching the limit on deleted entries.
pub const fn is_entries_limit_reached(&self) -> bool {
matches!(self, Self::DeletedEntriesLimitReached)
}
}
impl PruneProgress {
/// Creates new [`PruneProgress`].
///
/// If `done == true`, returns [`PruneProgress::Finished`], otherwise
/// [`PruneProgress::HasMoreData`] is returned with [`PruneInterruptReason`] according to the
/// passed limiter.
pub fn new(done: bool, limiter: &PruneLimiter) -> Self {
if done {
Self::Finished
} else {
Self::HasMoreData(PruneInterruptReason::new(limiter))
}
}
/// Returns `true` if prune run is finished.
pub const fn is_finished(&self) -> bool {
matches!(self, Self::Finished)
}
}

View File

@@ -46,9 +46,10 @@ impl<DB: Database> Stage<DB> for PruneStage {
.build(provider.static_file_provider().clone());
let result = pruner.run(provider, input.target())?;
if result.is_finished() {
if result.progress.is_finished() {
Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true })
} else {
info!(target: "sync::stages::prune::exec", segments = ?result.segments, "Pruner has more data to prune");
// We cannot set the checkpoint yet, because prune segments may have different highest
// pruned block numbers
Ok(ExecOutput { checkpoint: input.checkpoint(), done: false })