feat(stages): get transaction range starting from first available block (#19662)

This commit is contained in:
Alexey Shekhirin
2025-11-12 14:43:39 +00:00
committed by GitHub
parent c57a5204c2
commit 95b8a8535b
5 changed files with 211 additions and 26 deletions

3
Cargo.lock generated
View File

@@ -10508,7 +10508,10 @@ dependencies = [
"auto_impl",
"futures-util",
"metrics",
"reth-chainspec",
"reth-consensus",
"reth-db",
"reth-db-api",
"reth-errors",
"reth-metrics",
"reth-network-p2p",

View File

@@ -149,7 +149,7 @@ mod tests {
let static_provider = factory.static_file_provider();
assert_eq!(
static_provider.get_lowest_static_file_block(StaticFileSegment::Transactions),
static_provider.get_lowest_range_end(StaticFileSegment::Transactions),
test_case.expected_lowest_block
);
assert_eq!(
@@ -291,7 +291,7 @@ mod tests {
// Verify initial state
assert_eq!(
static_provider.get_lowest_static_file_block(StaticFileSegment::Transactions),
static_provider.get_lowest_range_end(StaticFileSegment::Transactions),
expected_before_update,
"Test case {}: Initial min_block mismatch",
idx
@@ -309,7 +309,7 @@ mod tests {
// Verify min_block was updated (not stuck at stale value)
assert_eq!(
static_provider.get_lowest_static_file_block(StaticFileSegment::Transactions),
static_provider.get_lowest_range_end(StaticFileSegment::Transactions),
Some(expected_after_update),
"Test case {}: min_block should be updated to {} (not stuck at stale value)",
idx,

View File

@@ -43,6 +43,9 @@ auto_impl.workspace = true
[dev-dependencies]
assert_matches.workspace = true
reth-chainspec.workspace = true
reth-db = { workspace = true, features = ["test-utils"] }
reth-db-api.workspace = true
reth-provider = { workspace = true, features = ["test-utils"] }
tokio = { workspace = true, features = ["sync", "rt-multi-thread"] }
tokio-stream.workspace = true
@@ -50,9 +53,12 @@ reth-testing-utils.workspace = true
[features]
test-utils = [
"reth-chainspec/test-utils",
"reth-consensus/test-utils",
"reth-db-api/test-utils",
"reth-db/test-utils",
"reth-network-p2p/test-utils",
"reth-primitives-traits/test-utils",
"reth-provider/test-utils",
"reth-stages-types/test-utils",
"reth-primitives-traits/test-utils",
]

View File

@@ -1,12 +1,13 @@
use crate::{error::StageError, StageCheckpoint, StageId};
use alloy_primitives::{BlockNumber, TxNumber};
use reth_provider::{BlockReader, ProviderError};
use reth_provider::{BlockReader, ProviderError, StaticFileProviderFactory, StaticFileSegment};
use std::{
cmp::{max, min},
future::{poll_fn, Future},
ops::{Range, RangeInclusive},
task::{Context, Poll},
};
use tracing::instrument;
/// Stage execution input, see [`Stage::execute`].
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
@@ -70,15 +71,29 @@ impl ExecInput {
/// Return the next block range determined the number of transactions within it.
/// This function walks the block indices until either the end of the range is reached or
/// the number of transactions exceeds the threshold.
#[instrument(level = "debug", target = "sync::stages", skip(provider), ret)]
pub fn next_block_range_with_transaction_threshold<Provider>(
&self,
provider: &Provider,
tx_threshold: u64,
) -> Result<(Range<TxNumber>, RangeInclusive<BlockNumber>, bool), StageError>
where
Provider: BlockReader,
Provider: StaticFileProviderFactory + BlockReader,
{
let start_block = self.next_block();
// Get lowest available block number for transactions
let Some(lowest_transactions_block) =
provider.static_file_provider().get_lowest_range_start(StaticFileSegment::Transactions)
else {
return Ok((0..0, 0..=0, true));
};
// We can only process transactions that have associated static files, so we cap the start
// block by lowest available block number.
//
// Certain transactions may not have associated static files when user deletes them
// manually. In that case, we can't process them, and need to adjust the start block
// accordingly.
let start_block = self.next_block().max(lowest_transactions_block);
let target_block = self.target();
let start_block_body = provider
@@ -277,3 +292,164 @@ pub trait StageExt<Provider>: Stage<Provider> {
}
impl<Provider, S: Stage<Provider> + ?Sized> StageExt<Provider> for S {}
#[cfg(test)]
mod tests {
use reth_chainspec::MAINNET;
use reth_db::test_utils::{create_test_rw_db, create_test_static_files_dir};
use reth_db_api::{models::StoredBlockBodyIndices, tables, transaction::DbTxMut};
use reth_provider::{
test_utils::MockNodeTypesWithDB, ProviderFactory, StaticFileProviderBuilder,
StaticFileProviderFactory, StaticFileSegment,
};
use reth_stages_types::StageCheckpoint;
use reth_testing_utils::generators::{self, random_signed_tx};
use crate::ExecInput;
#[test]
fn test_exec_input_next_block_range_with_transaction_threshold() {
let mut rng = generators::rng();
let provider_factory = ProviderFactory::<MockNodeTypesWithDB>::new(
create_test_rw_db(),
MAINNET.clone(),
StaticFileProviderBuilder::read_write(create_test_static_files_dir().0.keep())
.unwrap()
.with_blocks_per_file(1)
.build()
.unwrap(),
)
.unwrap();
// Without checkpoint, without transactions in static files
{
let exec_input = ExecInput { target: Some(100), checkpoint: None };
let (tx_range, block_range, is_final_range) = exec_input
.next_block_range_with_transaction_threshold(&provider_factory, 10)
.unwrap();
assert_eq!(tx_range, 0..0);
assert_eq!(block_range, 0..=0);
assert!(is_final_range);
}
// With checkpoint at block 10, without transactions in static files
{
let exec_input =
ExecInput { target: Some(1), checkpoint: Some(StageCheckpoint::new(10)) };
let (tx_range, block_range, is_final_range) = exec_input
.next_block_range_with_transaction_threshold(&provider_factory, 10)
.unwrap();
assert_eq!(tx_range, 0..0);
assert_eq!(block_range, 0..=0);
assert!(is_final_range);
}
// Without checkpoint, with transactions in static files starting from block 1
{
let exec_input = ExecInput { target: Some(1), checkpoint: None };
let mut provider_rw = provider_factory.provider_rw().unwrap();
provider_rw
.tx_mut()
.put::<tables::BlockBodyIndices>(
1,
StoredBlockBodyIndices { first_tx_num: 0, tx_count: 2 },
)
.unwrap();
let mut writer =
provider_rw.get_static_file_writer(0, StaticFileSegment::Transactions).unwrap();
writer.increment_block(0).unwrap();
writer.increment_block(1).unwrap();
writer.append_transaction(0, &random_signed_tx(&mut rng)).unwrap();
writer.append_transaction(1, &random_signed_tx(&mut rng)).unwrap();
drop(writer);
provider_rw.commit().unwrap();
let (tx_range, block_range, is_final_range) = exec_input
.next_block_range_with_transaction_threshold(&provider_factory, 10)
.unwrap();
assert_eq!(tx_range, 0..2);
assert_eq!(block_range, 1..=1);
assert!(is_final_range);
}
// With checkpoint at block 1, with transactions in static files starting from block 1
{
let exec_input =
ExecInput { target: Some(2), checkpoint: Some(StageCheckpoint::new(1)) };
let mut provider_rw = provider_factory.provider_rw().unwrap();
provider_rw
.tx_mut()
.put::<tables::BlockBodyIndices>(
2,
StoredBlockBodyIndices { first_tx_num: 2, tx_count: 1 },
)
.unwrap();
let mut writer =
provider_rw.get_static_file_writer(1, StaticFileSegment::Transactions).unwrap();
writer.increment_block(2).unwrap();
writer.append_transaction(2, &random_signed_tx(&mut rng)).unwrap();
drop(writer);
provider_rw.commit().unwrap();
let (tx_range, block_range, is_final_range) = exec_input
.next_block_range_with_transaction_threshold(&provider_factory, 10)
.unwrap();
assert_eq!(tx_range, 2..3);
assert_eq!(block_range, 2..=2);
assert!(is_final_range);
}
// Without checkpoint, with transactions in static files starting from block 2
{
let exec_input = ExecInput { target: Some(2), checkpoint: None };
provider_factory
.static_file_provider()
.delete_jar(StaticFileSegment::Transactions, 0)
.unwrap();
provider_factory
.static_file_provider()
.delete_jar(StaticFileSegment::Transactions, 1)
.unwrap();
let (tx_range, block_range, is_final_range) = exec_input
.next_block_range_with_transaction_threshold(&provider_factory, 10)
.unwrap();
assert_eq!(tx_range, 2..3);
assert_eq!(block_range, 2..=2);
assert!(is_final_range);
}
// Without checkpoint, with transactions in static files starting from block 2
{
let exec_input =
ExecInput { target: Some(3), checkpoint: Some(StageCheckpoint::new(2)) };
let mut provider_rw = provider_factory.provider_rw().unwrap();
provider_rw
.tx_mut()
.put::<tables::BlockBodyIndices>(
3,
StoredBlockBodyIndices { first_tx_num: 3, tx_count: 1 },
)
.unwrap();
let mut writer =
provider_rw.get_static_file_writer(1, StaticFileSegment::Transactions).unwrap();
writer.increment_block(3).unwrap();
writer.append_transaction(3, &random_signed_tx(&mut rng)).unwrap();
drop(writer);
provider_rw.commit().unwrap();
let (tx_range, block_range, is_final_range) = exec_input
.next_block_range_with_transaction_threshold(&provider_factory, 10)
.unwrap();
assert_eq!(tx_range, 3..4);
assert_eq!(block_range, 3..=3);
assert!(is_final_range);
}
}
}

View File

@@ -601,7 +601,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
let mut deleted_headers = Vec::new();
loop {
let Some(block_height) = self.get_lowest_static_file_block(segment) else {
let Some(block_height) = self.get_lowest_range_end(segment) else {
return Ok(deleted_headers)
};
@@ -1248,24 +1248,6 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
self.earliest_history_height.load(std::sync::atomic::Ordering::Relaxed)
}
/// Gets the lowest transaction static file block if it exists.
///
/// For example if the transactions static file has blocks 0-499, this will return 499..
///
/// If there is nothing on disk for the given segment, this will return [`None`].
pub fn get_lowest_transaction_static_file_block(&self) -> Option<BlockNumber> {
self.get_lowest_static_file_block(StaticFileSegment::Transactions)
}
/// Gets the lowest static file's block height if it exists for a static file segment.
///
/// For example if the static file has blocks 0-499, this will return 499..
///
/// If there is nothing on disk for the given segment, this will return [`None`].
pub fn get_lowest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
self.static_files_min_block.read().get(&segment).map(|range| range.end())
}
/// Gets the lowest static file's block range if it exists for a static file segment.
///
/// If there is nothing on disk for the given segment, this will return [`None`].
@@ -1273,6 +1255,24 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
self.static_files_min_block.read().get(&segment).copied()
}
/// Gets the lowest static file's block range start if it exists for a static file segment.
///
/// For example if the lowest static file has blocks 0-499, this will return 0.
///
/// If there is nothing on disk for the given segment, this will return [`None`].
pub fn get_lowest_range_start(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
self.get_lowest_range(segment).map(|range| range.start())
}
/// Gets the lowest static file's block range end if it exists for a static file segment.
///
/// For example if the static file has blocks 0-499, this will return 499.
///
/// If there is nothing on disk for the given segment, this will return [`None`].
pub fn get_lowest_range_end(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
self.get_lowest_range(segment).map(|range| range.end())
}
/// Gets the highest static file's block height if it exists for a static file segment.
///
/// If there is nothing on disk for the given segment, this will return [`None`].