fix(stages): transaction lookup stage checkpoint calculation (#3909)

This commit is contained in:
Alexey Shekhirin
2023-07-26 17:44:46 +01:00
committed by GitHub
parent aa5d39dd6d
commit 74bbe5afa8

View File

@@ -11,9 +11,9 @@ use reth_db::{
use reth_primitives::{
keccak256,
stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
TransactionSignedNoHash, TxNumber, H256,
PrunePart, TransactionSignedNoHash, TxNumber, H256,
};
use reth_provider::DatabaseProviderRW;
use reth_provider::{BlockReader, DatabaseProviderRW, PruneCheckpointReader};
use tokio::sync::mpsc;
use tracing::*;
@@ -183,9 +183,20 @@ fn calculate_hash(
fn stage_checkpoint<DB: Database>(
provider: &DatabaseProviderRW<'_, &DB>,
) -> Result<EntitiesCheckpoint, DatabaseError> {
) -> Result<EntitiesCheckpoint, StageError> {
let pruned_entries = provider
.get_prune_checkpoint(PrunePart::TransactionLookup)?
.map(|checkpoint| provider.block_body_indices(checkpoint.block_number))
.transpose()?
.flatten()
// +1 is needed because TxNumber is 0-indexed
.map(|body| body.last_tx_num() + 1)
.unwrap_or_default();
Ok(EntitiesCheckpoint {
processed: provider.tx_ref().entries::<tables::TxHashNumber>()? as u64,
// If `TxHashNumber` table was pruned, we will have a number of entries in it not matching
// the actual number of processed transactions. To fix that, we add the number of pruned
// `TxHashNumber` entries.
processed: provider.tx_ref().entries::<tables::TxHashNumber>()? as u64 + pruned_entries,
total: provider.tx_ref().entries::<tables::Transactions>()? as u64,
})
}
@@ -202,8 +213,13 @@ mod tests {
generators,
generators::{random_block, random_block_range},
};
use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedBlock, H256};
use reth_provider::{BlockReader, ProviderError, TransactionsProvider};
use reth_primitives::{
stage::StageUnitCheckpoint, BlockNumber, PruneCheckpoint, PruneMode, SealedBlock, H256,
MAINNET,
};
use reth_provider::{
BlockReader, ProviderError, ProviderFactory, PruneCheckpointWriter, TransactionsProvider,
};
// Implement stage test suite.
stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup);
@@ -321,6 +337,57 @@ mod tests {
assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed");
}
#[test]
fn stage_checkpoint_pruned() {
let tx = TestTransaction::default();
let mut rng = generators::rng();
let blocks = random_block_range(&mut rng, 0..=100, H256::zero(), 0..10);
tx.insert_blocks(blocks.iter(), None).expect("insert blocks");
let max_pruned_block = 30;
let max_processed_block = 70;
let mut tx_hash_numbers = Vec::new();
let mut tx_hash_number = 0;
for block in &blocks[..=max_processed_block] {
for transaction in &block.body {
if block.number > max_pruned_block {
tx_hash_numbers.push((transaction.hash, tx_hash_number));
}
tx_hash_number += 1;
}
}
tx.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
let provider = tx.inner_rw();
provider
.save_prune_checkpoint(
PrunePart::TransactionLookup,
PruneCheckpoint {
block_number: max_pruned_block as BlockNumber,
prune_mode: PruneMode::Full,
},
)
.expect("save stage checkpoint");
provider.commit().expect("commit");
let db = tx.inner_raw();
let factory = ProviderFactory::new(db.as_ref(), MAINNET.clone());
let provider = factory.provider_rw().expect("provider rw");
assert_eq!(
stage_checkpoint(&provider).expect("stage checkpoint"),
EntitiesCheckpoint {
processed: blocks[..=max_processed_block]
.iter()
.map(|block| block.body.len() as u64)
.sum::<u64>(),
total: blocks.iter().map(|block| block.body.len() as u64).sum::<u64>()
}
);
}
struct TransactionLookupTestRunner {
tx: TestTransaction,
threshold: u64,