fix: ensure transaction lookup can prune (#19553)

This commit is contained in:
joshieDo
2026-02-03 18:11:13 +00:00
committed by GitHub
parent 6d02565c5e
commit eee27df27c
5 changed files with 390 additions and 64 deletions

View File

@@ -25,6 +25,10 @@ pub use user::{
///
/// This is a generic helper function used by both receipts and bodies pruning
/// when data is stored in static files.
///
/// The checkpoint block number is set to the highest block in the actually deleted files,
/// not `input.to_block`, since `to_block` might refer to a block in the middle of an
/// undeleted file.
pub(crate) fn prune_static_files<Provider>(
provider: &Provider,
input: PruneInput,
@@ -37,18 +41,31 @@ where
provider.static_file_provider().delete_segment_below_block(segment, input.to_block + 1)?;
if deleted_headers.is_empty() {
return Ok(SegmentOutput::done())
return Ok(SegmentOutput {
progress: PruneProgress::Finished,
pruned: 0,
checkpoint: input
.previous_checkpoint
.map(SegmentOutputCheckpoint::from_prune_checkpoint),
})
}
let tx_ranges = deleted_headers.iter().filter_map(|header| header.tx_range());
let pruned = tx_ranges.clone().map(|range| range.len()).sum::<u64>() as usize;
// The highest block number in the deleted files is the actual checkpoint.
let checkpoint_block = deleted_headers
.iter()
.filter_map(|header| header.block_range())
.map(|range| range.end())
.max();
Ok(SegmentOutput {
progress: PruneProgress::Finished,
pruned,
checkpoint: Some(SegmentOutputCheckpoint {
block_number: Some(input.to_block),
block_number: checkpoint_block,
tx_number: tx_ranges.map(|range| range.end()).max(),
}),
})

View File

@@ -76,10 +76,11 @@ where
} = prune_modes;
Self::default()
// Transaction lookup
// Transaction lookup must run before bodies because it needs to read transaction
// data from static files before bodies deletes them.
.segment_opt(transaction_lookup.map(TransactionLookup::new))
// Bodies - run first since file deletion is fast
.segment_opt(bodies_history.map(Bodies::new))
// Bodies
.segment_opt(bodies_history.map(|mode| Bodies::new(mode, transaction_lookup)))
// Account history
.segment_opt(account_history.map(AccountHistory::new))
// Storage history

View File

@@ -2,9 +2,14 @@ use crate::{
segments::{self, PruneInput, Segment},
PrunerError,
};
use reth_provider::{BlockReader, StaticFileProviderFactory};
use reth_prune_types::{PruneMode, PrunePurpose, PruneSegment, SegmentOutput};
use alloy_primitives::BlockNumber;
use reth_provider::{BlockReader, PruneCheckpointReader, StaticFileProviderFactory};
use reth_prune_types::{
PruneInterruptReason, PruneMode, PrunePurpose, PruneSegment, SegmentOutput,
SegmentOutputCheckpoint,
};
use reth_static_file_types::StaticFileSegment;
use tracing::debug;
/// Segment responsible for pruning transactions in static files.
///
@@ -12,18 +17,79 @@ use reth_static_file_types::StaticFileSegment;
#[derive(Debug)]
pub struct Bodies {
mode: PruneMode,
/// Transaction lookup prune mode. Used to determine if we need to wait for tx lookup pruning
/// before deleting transaction bodies.
tx_lookup_mode: Option<PruneMode>,
}
impl Bodies {
/// Creates a new [`Bodies`] segment with the given prune mode.
pub const fn new(mode: PruneMode) -> Self {
Self { mode }
/// Creates a new [`Bodies`] segment with the given prune mode and optional transaction lookup
/// prune mode for coordination.
pub const fn new(mode: PruneMode, tx_lookup_mode: Option<PruneMode>) -> Self {
Self { mode, tx_lookup_mode }
}
/// Returns the next best block that bodies can prune up to considering the transaction lookup
/// pruning configuration (if any) and progress.
///
/// Returns `None` if there's no block available to prune (e.g., waiting on `tx_lookup`).
fn next_bodies_prune_target<Provider>(
&self,
provider: &Provider,
input: &PruneInput,
) -> Result<Option<BlockNumber>, PrunerError>
where
Provider: PruneCheckpointReader,
{
let Some(tx_lookup_mode) = self.tx_lookup_mode else { return Ok(Some(input.to_block)) };
let tx_lookup_checkpoint = provider
.get_prune_checkpoint(PruneSegment::TransactionLookup)?
.and_then(|cp| cp.block_number);
// Determine the safe prune target, if any.
// tx_lookup's next_pruned_block tells us what block it will prune next.
// - None: tx_lookup will never prune more blocks (e.g. Before(N) reached its target), so
// bodies can prune freely
// - Some(next) > to_block: tx_lookup is ahead of our target, so we're safe to prune
// to_block
// - Some(next) <= to_block: tx_lookup still needs to prune blocks we want to delete, so we
// must wait and only prune up to (next - 1) to preserve tx data it needs
let to_block = match tx_lookup_mode.next_pruned_block(tx_lookup_checkpoint) {
None => Some(input.to_block),
Some(tx_lookup_next) if tx_lookup_next > input.to_block => Some(input.to_block),
Some(tx_lookup_next) => {
// We can only prune bodies up to the block BEFORE tx_lookup's next target.
// tx_lookup_next is the next block tx_lookup will prune, meaning it still needs
// to read transactions from that block. We must preserve those transactions,
// so bodies can only safely delete up to (tx_lookup_next - 1).
let Some(safe) = tx_lookup_next.checked_sub(1) else {
return Ok(None);
};
if input.previous_checkpoint.is_some_and(|cp| cp.block_number.unwrap_or(0) >= safe)
{
// we have pruned what we can
return Ok(None)
}
debug!(
target: "pruner",
to_block = input.to_block,
safe,
"Bodies pruning limited by tx_lookup progress"
);
Some(safe)
}
};
Ok(to_block)
}
}
impl<Provider> Segment<Provider> for Bodies
where
Provider: StaticFileProviderFactory + BlockReader,
Provider: StaticFileProviderFactory + BlockReader + PruneCheckpointReader,
{
fn segment(&self) -> PruneSegment {
PruneSegment::Bodies
@@ -38,7 +104,20 @@ where
}
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
segments::prune_static_files(provider, input, StaticFileSegment::Transactions)
let Some(to_block) = self.next_bodies_prune_target(provider, &input)? else {
debug!(
to_block = input.to_block,
"Transaction lookup still has work to be done up to target block"
);
return Ok(SegmentOutput::not_done(
PruneInterruptReason::WaitingOnSegment(PruneSegment::TransactionLookup),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
));
};
// Use the coordinated to_block instead of input.to_block
let adjusted_input = PruneInput { to_block, ..input };
segments::prune_static_files(provider, adjusted_input, StaticFileSegment::Transactions)
}
}
@@ -50,7 +129,8 @@ mod tests {
use reth_exex_types::FinishedExExHeight;
use reth_provider::{
test_utils::{create_test_provider_factory, MockNodeTypesWithDB},
ProviderFactory, StaticFileWriter,
DBProvider, DatabaseProviderFactory, ProviderFactory, PruneCheckpointWriter,
StaticFileWriter,
};
use reth_prune_types::{PruneMode, PruneProgress, PruneSegment};
use reth_static_file_types::{
@@ -93,19 +173,83 @@ mod tests {
static_file_provider.initialize_index().expect("initialize index");
}
struct PruneTestCase {
prune_mode: PruneMode,
struct TestCase {
tx_lookup_mode: Option<PruneMode>,
tx_lookup_checkpoint_block: Option<BlockNumber>,
bodies_mode: PruneMode,
expected_pruned: usize,
expected_lowest_block: Option<BlockNumber>,
expected_progress: PruneProgress,
}
impl TestCase {
fn new() -> Self {
Self {
tx_lookup_mode: None,
tx_lookup_checkpoint_block: None,
bodies_mode: PruneMode::Full,
expected_pruned: 0,
expected_lowest_block: None,
expected_progress: PruneProgress::Finished,
}
}
fn with_bodies_mode(mut self, mode: PruneMode) -> Self {
self.bodies_mode = mode;
self
}
fn with_expected_pruned(mut self, pruned: usize) -> Self {
self.expected_pruned = pruned;
self
}
fn with_expected_progress(mut self, progress: PruneProgress) -> Self {
self.expected_progress = progress;
self
}
fn with_lowest_block(mut self, block: BlockNumber) -> Self {
self.expected_lowest_block = Some(block);
self
}
fn with_tx_lookup(mut self, mode: PruneMode, checkpoint: Option<BlockNumber>) -> Self {
self.tx_lookup_mode = Some(mode);
self.tx_lookup_checkpoint_block = checkpoint;
self
}
}
fn run_prune_test(
factory: &ProviderFactory<MockNodeTypesWithDB>,
finished_exex_height_rx: &tokio::sync::watch::Receiver<FinishedExExHeight>,
test_case: PruneTestCase,
test_case: TestCase,
tip: BlockNumber,
) {
let bodies = Bodies::new(test_case.prune_mode);
let (_, finished_exex_height_rx) = tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
// Capture highest block before pruning
let static_provider = factory.static_file_provider();
let highest_before =
static_provider.get_highest_static_file_block(StaticFileSegment::Transactions);
// Set up tx_lookup checkpoint if provided
if let Some(checkpoint_block) = test_case.tx_lookup_checkpoint_block {
let provider = factory.database_provider_rw().unwrap();
provider
.save_prune_checkpoint(
PruneSegment::TransactionLookup,
reth_prune_types::PruneCheckpoint {
block_number: Some(checkpoint_block),
tx_number: None,
prune_mode: test_case.tx_lookup_mode.unwrap(),
},
)
.unwrap();
provider.commit().unwrap();
}
let bodies = Bodies::new(test_case.bodies_mode, test_case.tx_lookup_mode);
let segments: Vec<Box<dyn Segment<_>>> = vec![Box::new(bodies)];
let mut pruner = Pruner::new_with_factory(
@@ -114,27 +258,29 @@ mod tests {
5,
10000,
None,
finished_exex_height_rx.clone(),
finished_exex_height_rx,
);
let result = pruner.run(tip).expect("pruner run");
assert_eq!(result.progress, PruneProgress::Finished);
assert_eq!(result.progress, test_case.expected_progress);
assert_eq!(result.segments.len(), 1);
let (segment, output) = &result.segments[0];
assert_eq!(*segment, PruneSegment::Bodies);
assert_eq!(output.pruned, test_case.expected_pruned);
let static_provider = factory.static_file_provider();
assert_eq!(
static_provider.get_lowest_range_end(StaticFileSegment::Transactions),
test_case.expected_lowest_block
);
assert_eq!(
static_provider.get_highest_static_file_block(StaticFileSegment::Transactions),
Some(tip)
);
if let Some(expected_lowest) = test_case.expected_lowest_block {
let static_provider = factory.static_file_provider();
assert_eq!(
static_provider.get_lowest_range_end(StaticFileSegment::Transactions),
Some(expected_lowest)
);
assert_eq!(
static_provider.get_highest_static_file_block(StaticFileSegment::Transactions),
highest_before
);
}
}
#[test]
@@ -143,50 +289,82 @@ mod tests {
let tip = 2_499_999;
setup_static_file_jars(&factory, tip);
let (_, finished_exex_height_rx) = tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
let (_, _finished_exex_height_rx) =
tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
let test_cases = vec![
// Test 1: PruneMode::Before(750_000) → deletes jar 1 (0-499_999)
PruneTestCase {
prune_mode: PruneMode::Before(750_000),
expected_pruned: 1000,
expected_lowest_block: Some(999_999),
},
// Test 2: PruneMode::Before(850_000) → no deletion (jar 2: 500_000-999_999 contains
// Test 1: PruneMode::Before(750_000) → deletes jar 0 (0-499_999)
// Checkpoint 499_999 != target 749_999 -> HasMoreData
TestCase::new()
.with_bodies_mode(PruneMode::Before(750_000))
.with_expected_pruned(1000)
.with_lowest_block(999_999),
// Test 2: PruneMode::Before(850_000) → no deletion (jar 1: 500_000-999_999 contains
// target)
PruneTestCase {
prune_mode: PruneMode::Before(850_000),
expected_pruned: 0,
expected_lowest_block: Some(999_999),
},
// Test 3: PruneMode::Before(1_599_999) → deletes jar 2 (500_000-999_999) and jar 3
// (1_000_000-1_499_999)
PruneTestCase {
prune_mode: PruneMode::Before(1_599_999),
expected_pruned: 2000,
expected_lowest_block: Some(1_999_999),
},
// Test 4: PruneMode::Distance(500_000) with tip=2_499_999 → deletes jar 4
// (1_500_000-1_999_999)
PruneTestCase {
prune_mode: PruneMode::Distance(500_000),
expected_pruned: 1000,
expected_lowest_block: Some(2_499_999),
},
// Test 5: PruneMode::Before(2_300_000) → no deletion (jar 5: 2_000_000-2_499_999
TestCase::new().with_bodies_mode(PruneMode::Before(850_000)).with_lowest_block(999_999),
// Test 3: PruneMode::Before(1_599_999) → deletes jars 0 and 1 (0-999_999)
// Checkpoint 999_999 != target 1_599_998 -> HasMoreData
TestCase::new()
.with_bodies_mode(PruneMode::Before(1_599_999))
.with_expected_pruned(2000)
.with_lowest_block(1_999_999),
// Test 4: PruneMode::Distance(500_000) with tip=2_499_999 → deletes jar 3
// (1_500_000-1_999_999) Checkpoint 1_999_999 == target 1_999_999 ->
// Finished
TestCase::new()
.with_bodies_mode(PruneMode::Distance(500_000))
.with_expected_pruned(1000)
.with_lowest_block(2_499_999),
// Test 5: PruneMode::Before(2_300_000) → no deletion (jar 4: 2_000_000-2_499_999
// contains target)
PruneTestCase {
prune_mode: PruneMode::Before(2_300_000),
expected_pruned: 0,
expected_lowest_block: Some(2_499_999),
},
TestCase::new()
.with_bodies_mode(PruneMode::Before(2_300_000))
.with_lowest_block(2_499_999),
];
for test_case in test_cases {
run_prune_test(&factory, &finished_exex_height_rx, test_case, tip);
run_prune_test(&factory, test_case, tip);
}
}
#[test]
fn checkpoint_reflects_deleted_files_not_target() {
// Test that checkpoint is set to the highest deleted block, not to_block.
// When to_block falls in the middle of an undeleted file, checkpoint should reflect
// what was actually deleted.
let factory = create_test_provider_factory();
let tip = 1_499_999;
setup_static_file_jars(&factory, tip);
// Use PruneMode::Before(900_000) which targets 899_999.
// This should delete jar 0 (0-499_999) since it's entirely below the target.
// Jar 1 (500_000-999_999) contains the target, so it won't be deleted.
// Checkpoint should be 499_999 (end of jar 0), not 899_999 (to_block).
let bodies = Bodies::new(PruneMode::Before(900_000), None);
let segments: Vec<Box<dyn Segment<_>>> = vec![Box::new(bodies)];
let (_, finished_exex_height_rx) = tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
let mut pruner =
Pruner::new_with_factory(factory, segments, 5, 10000, None, finished_exex_height_rx);
let result = pruner.run(tip).expect("pruner run");
assert_eq!(result.progress, PruneProgress::Finished);
assert_eq!(result.segments.len(), 1);
let (segment, output) = &result.segments[0];
assert_eq!(*segment, PruneSegment::Bodies);
// Verify checkpoint is set to the end of deleted jar (499_999), not to_block (899_999)
let checkpoint_block = output.checkpoint.as_ref().and_then(|cp| cp.block_number);
assert_eq!(
checkpoint_block,
Some(499_999),
"Checkpoint should be 499_999 (end of deleted jar 0), not 899_999 (to_block)"
);
}
#[test]
fn min_block_updated_on_sync() {
// Regression test: update_index must update min_block to prevent stale values
@@ -302,4 +480,100 @@ mod tests {
assert_eq!(deleted.len(), expected_deleted);
}
}
#[test]
fn bodies_with_tx_lookup_coordination() {
// Test that bodies pruning correctly coordinates with tx lookup pruning
// Using tip = 1_523_000 creates 4 static file jars:
// - Jar 0: blocks 0-499_999, txs 0-999
// - Jar 1: blocks 500_000-999_999, txs 1000-1999
// - Jar 2: blocks 1_000_000-1_499_999, txs 2000-2999
// - Jar 3: blocks 1_500_000-1_523_000, txs 3000-3999
let tip = 1_523_000;
let test_cases = vec![
// Scenario 1: tx_lookup disabled, bodies can prune freely (deletes jar 0)
// Checkpoint is 499_999 (end of jar 0), target is 599_999, so HasMoreData
TestCase::new()
.with_bodies_mode(PruneMode::Before(600_000))
.with_expected_pruned(1000)
.with_lowest_block(999_999),
// Scenario 2: tx_lookup enabled but not run yet, bodies cannot prune
TestCase::new()
.with_tx_lookup(PruneMode::Before(600_000), None)
.with_bodies_mode(PruneMode::Before(600_000))
.with_expected_progress(PruneProgress::HasMoreData(
PruneInterruptReason::WaitingOnSegment(PruneSegment::TransactionLookup),
))
.with_lowest_block(499_999), // No jars deleted, jar 0 ends at 499_999
// Scenario 3: tx_lookup caught up to its target, bodies can prune freely
// Deletes jar 0, checkpoint is 499_999, target is 599_999 -> HasMoreData
TestCase::new()
.with_tx_lookup(PruneMode::Before(600_000), Some(599_999))
.with_bodies_mode(PruneMode::Before(600_000))
.with_expected_pruned(1000)
.with_lowest_block(999_999),
// Scenario 4: tx_lookup behind its target, bodies limited to tx_lookup checkpoint
// tx_lookup should prune up to 599_999, but checkpoint is only at 250_000
// bodies wants to prune up to 599_999, but limited to 250_000
// No jars deleted because jar 0 (0-499_999) ends beyond 250_000
TestCase::new()
.with_tx_lookup(PruneMode::Before(600_000), Some(250_000))
.with_bodies_mode(PruneMode::Before(600_000))
.with_lowest_block(499_999), // No jars deleted
// Scenario 5: Both use Distance, tx_lookup caught up
// With tip=1_523_000, Distance(500_000) targets block 1_023_000
// Deletes jars 0 and 1, checkpoint is 999_999, target is 1_023_000 -> HasMoreData
TestCase::new()
.with_tx_lookup(PruneMode::Distance(500_000), Some(1_023_000))
.with_bodies_mode(PruneMode::Distance(500_000))
.with_expected_pruned(2000)
.with_lowest_block(1_499_999),
// Scenario 6: Both use Distance, tx_lookup less aggressive (bigger distance) than
// bodies With tip=1_523_000:
// - tx_lookup: Distance(1_000_000) targets block 523_000, checkpoint at 523_000
// - bodies: Distance(500_000) targets block 1_023_000
// Bodies can prune up to what tx_lookup has finished (523_000), deleting jar 0
// Checkpoint is 499_999, target is 1_023_000 -> HasMoreData
TestCase::new()
.with_tx_lookup(PruneMode::Distance(1_000_000), Some(523_000))
.with_bodies_mode(PruneMode::Distance(500_000))
.with_expected_pruned(1000) // Jar 0 deleted
.with_lowest_block(999_999), // Jar 0 (0-499_999) deleted
// Scenario 7: tx_lookup more aggressive than bodies (deletes jar 0 and 1)
// tx_lookup: Before(1_100_000) -> prune up to 1_099_999
// bodies: Before(1_100_000) -> wants to prune up to 1_099_999
// Checkpoint is 999_999, target is 1_099_999 -> HasMoreData
TestCase::new()
.with_tx_lookup(PruneMode::Before(1_100_000), Some(1_099_999))
.with_bodies_mode(PruneMode::Before(1_100_000))
.with_expected_pruned(2000)
.with_lowest_block(1_499_999), // Jars 0 and 1 deleted
// Scenario 8: tx_lookup has lower target than bodies, but is done
// tx_lookup: Before(600_000) -> prune up to 599_999 (checkpoint at 599_999, DONE)
// bodies: Before(1_100_000) -> wants to prune up to 1_099_999
// Since tx_lookup is done (next_pruned_block returns None), bodies can prune freely
// Checkpoint is 999_999, target is 1_099_999 -> HasMoreData
TestCase::new()
.with_tx_lookup(PruneMode::Before(600_000), Some(599_999))
.with_bodies_mode(PruneMode::Before(1_100_000))
.with_expected_pruned(2000)
.with_lowest_block(1_499_999), // Jars 0 and 1 deleted
// Scenario 9: Perfect alignment - checkpoint equals target
// bodies: Before(1_000_000) -> targets 999_999
// Deletes jars 0 and 1 (0-999_999), checkpoint is 999_999 which equals target ->
// Finished
TestCase::new()
.with_bodies_mode(PruneMode::Before(1_000_000))
.with_expected_pruned(2000)
.with_expected_progress(PruneProgress::Finished)
.with_lowest_block(1_499_999), // Jars 0 and 1 deleted
];
for test_case in test_cases {
let factory = create_test_provider_factory();
setup_static_file_jars(&factory, tip);
run_prune_test(&factory, test_case, tip);
}
}
}

View File

@@ -84,6 +84,38 @@ impl PruneMode {
pub const fn is_distance(&self) -> bool {
matches!(self, Self::Distance(_))
}
/// Returns the next block number that will EVENTUALLY be pruned after the given checkpoint. It
/// should not be used to find if there are blocks to be pruned right now. For that, use
/// [`Self::prune_target_block`].
///
/// This is independent of the current tip and indicates what block is next in the pruning
/// sequence according to this mode's configuration. Returns `None` if no more blocks will
/// be pruned (i.e., the mode has reached its target).
///
/// # Examples
///
/// - `Before(10)` with checkpoint at block 5 returns `Some(6)`
/// - `Before(10)` with checkpoint at block 9 returns `None` (done)
/// - `Distance(100)` with checkpoint at block 1000 returns `Some(1001)` (always has more)
/// - `Full` always returns the next block after checkpoint
pub const fn next_pruned_block(&self, checkpoint: Option<BlockNumber>) -> Option<BlockNumber> {
let next = match checkpoint {
Some(c) => c + 1,
None => 0,
};
match self {
Self::Before(n) => {
if next < *n {
Some(next)
} else {
None
}
}
Self::Distance(_) | Self::Full => Some(next),
}
}
}
#[cfg(test)]

View File

@@ -93,7 +93,7 @@ impl SegmentOutput {
Self { progress: PruneProgress::Finished, pruned: 0, checkpoint: None }
}
/// Returns a [`SegmentOutput`] with `done = false`, `pruned = 0` and `checkpoint = None`.
/// Returns a [`SegmentOutput`] with `done = false`, `pruned = 0` and the given checkpoint.
/// Use when pruning is needed but cannot be done.
pub const fn not_done(
reason: PruneInterruptReason,
@@ -142,6 +142,8 @@ pub enum PruneInterruptReason {
Timeout,
/// Limit on the number of deleted entries (rows in the database) per prune run was reached.
DeletedEntriesLimitReached,
/// Waiting for another segment to finish pruning before this segment can proceed.
WaitingOnSegment(PruneSegment),
/// Unknown reason for stopping prune run.
Unknown,
}