mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-31 01:58:17 -05:00
feat(pruner): transaction lookup (#3892)
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -5588,6 +5588,7 @@ version = "0.1.0-alpha.4"
|
||||
dependencies = [
|
||||
"assert_matches",
|
||||
"itertools 0.10.5",
|
||||
"rayon",
|
||||
"reth-db",
|
||||
"reth-interfaces",
|
||||
"reth-primitives",
|
||||
|
||||
@@ -21,6 +21,7 @@ reth-interfaces = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
itertools = "0.10"
|
||||
rayon = "1.6.0"
|
||||
|
||||
[dev-dependencies]
|
||||
# reth
|
||||
|
||||
@@ -4,6 +4,9 @@ use thiserror::Error;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum PrunerError {
|
||||
#[error("Inconsistent data: {0}")]
|
||||
InconsistentData(&'static str),
|
||||
|
||||
#[error("An interface error occurred.")]
|
||||
Interface(#[from] reth_interfaces::Error),
|
||||
|
||||
|
||||
@@ -1,10 +1,16 @@
|
||||
//! Support for pruning.
|
||||
|
||||
use crate::PrunerError;
|
||||
use rayon::prelude::*;
|
||||
use reth_db::{database::Database, tables};
|
||||
use reth_primitives::{BlockNumber, ChainSpec, PruneCheckpoint, PruneMode, PruneModes, PrunePart};
|
||||
use reth_provider::{BlockReader, DatabaseProviderRW, ProviderFactory, PruneCheckpointWriter};
|
||||
use std::sync::Arc;
|
||||
use reth_primitives::{
|
||||
BlockNumber, ChainSpec, PruneCheckpoint, PruneMode, PruneModes, PrunePart, TxNumber,
|
||||
};
|
||||
use reth_provider::{
|
||||
BlockReader, DatabaseProviderRW, ProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
|
||||
TransactionsProvider,
|
||||
};
|
||||
use std::{ops::RangeInclusive, sync::Arc};
|
||||
use tracing::{debug, instrument, trace};
|
||||
|
||||
/// Result of [Pruner::run] execution
|
||||
@@ -15,11 +21,12 @@ pub type PrunerWithResult<DB> = (Pruner<DB>, PrunerResult);
|
||||
|
||||
pub struct BatchSizes {
|
||||
receipts: usize,
|
||||
transaction_lookup: usize,
|
||||
}
|
||||
|
||||
impl Default for BatchSizes {
|
||||
fn default() -> Self {
|
||||
Self { receipts: 10000 }
|
||||
Self { receipts: 10000, transaction_lookup: 10000 }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,6 +77,12 @@ impl<DB: Database> Pruner<DB> {
|
||||
self.prune_receipts(&provider, to_block, prune_mode)?;
|
||||
}
|
||||
|
||||
if let Some((to_block, prune_mode)) =
|
||||
self.modes.prune_target_block_transaction_lookup(tip_block_number)
|
||||
{
|
||||
self.prune_transaction_lookup(&provider, to_block, prune_mode)?;
|
||||
}
|
||||
|
||||
provider.commit()?;
|
||||
|
||||
self.last_pruned_block_number = Some(tip_block_number);
|
||||
@@ -97,6 +110,37 @@ impl<DB: Database> Pruner<DB> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Get next inclusive tx number range to prune according to the checkpoint and `to_block` block
|
||||
/// number.
|
||||
///
|
||||
/// To get the range start:
|
||||
/// 1. If checkpoint exists, get next block body and return its first tx number.
|
||||
/// 2. If checkpoint doesn't exist, return 0.
|
||||
///
|
||||
/// To get the range end: get last tx number for the provided `to_block`.
|
||||
fn get_next_tx_num_range_from_checkpoint(
|
||||
&self,
|
||||
provider: &DatabaseProviderRW<'_, DB>,
|
||||
prune_part: PrunePart,
|
||||
to_block: BlockNumber,
|
||||
) -> reth_interfaces::Result<Option<RangeInclusive<TxNumber>>> {
|
||||
let from_tx_num = provider
|
||||
.get_prune_checkpoint(prune_part)?
|
||||
.map(|checkpoint| provider.block_body_indices(checkpoint.block_number + 1))
|
||||
.transpose()?
|
||||
.flatten()
|
||||
.map(|body| body.first_tx_num)
|
||||
.unwrap_or_default();
|
||||
|
||||
let to_tx_num = match provider.block_body_indices(to_block)? {
|
||||
Some(body) => body,
|
||||
None => return Ok(None),
|
||||
}
|
||||
.last_tx_num();
|
||||
|
||||
Ok(Some(from_tx_num..=to_tx_num))
|
||||
}
|
||||
|
||||
/// Prune receipts up to the provided block, inclusive.
|
||||
#[instrument(level = "trace", skip(self, provider), target = "pruner")]
|
||||
fn prune_receipts(
|
||||
@@ -105,16 +149,20 @@ impl<DB: Database> Pruner<DB> {
|
||||
to_block: BlockNumber,
|
||||
prune_mode: PruneMode,
|
||||
) -> PrunerResult {
|
||||
let to_block_body = match provider.block_body_indices(to_block)? {
|
||||
Some(body) => body,
|
||||
let range = match self.get_next_tx_num_range_from_checkpoint(
|
||||
provider,
|
||||
PrunePart::Receipts,
|
||||
to_block,
|
||||
)? {
|
||||
Some(range) => range,
|
||||
None => {
|
||||
trace!(target: "pruner", "No receipts to prune");
|
||||
return Ok(())
|
||||
}
|
||||
};
|
||||
|
||||
provider.prune_table_in_batches::<tables::Receipts, _, _>(
|
||||
..=to_block_body.last_tx_num(),
|
||||
provider.prune_table_in_batches::<tables::Receipts, _>(
|
||||
range,
|
||||
self.batch_sizes.receipts,
|
||||
|receipts| {
|
||||
trace!(
|
||||
@@ -132,6 +180,71 @@ impl<DB: Database> Pruner<DB> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Prune transaction lookup entries up to the provided block, inclusive.
|
||||
#[instrument(level = "trace", skip(self, provider), target = "pruner")]
|
||||
fn prune_transaction_lookup(
|
||||
&self,
|
||||
provider: &DatabaseProviderRW<'_, DB>,
|
||||
to_block: BlockNumber,
|
||||
prune_mode: PruneMode,
|
||||
) -> PrunerResult {
|
||||
let range = match self.get_next_tx_num_range_from_checkpoint(
|
||||
provider,
|
||||
PrunePart::TransactionLookup,
|
||||
to_block,
|
||||
)? {
|
||||
Some(range) => range,
|
||||
None => {
|
||||
trace!(target: "pruner", "No receipts to prune");
|
||||
return Ok(())
|
||||
}
|
||||
};
|
||||
let last_tx_num = *range.end();
|
||||
|
||||
for i in range.step_by(self.batch_sizes.transaction_lookup) {
|
||||
// The `min` ensures that the transaction range doesn't exceed the last transaction
|
||||
// number. `last_tx_num + 1` is used to include the last transaction in the range.
|
||||
let tx_range = i..(i + self.batch_sizes.transaction_lookup as u64).min(last_tx_num + 1);
|
||||
|
||||
// Retrieve transactions in the range and calculate their hashes in parallel
|
||||
let mut hashes = provider
|
||||
.transactions_by_tx_range(tx_range.clone())?
|
||||
.into_par_iter()
|
||||
.map(|transaction| transaction.hash())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Number of transactions retrieved from the database should match the tx range count
|
||||
let tx_count = tx_range.clone().count();
|
||||
if hashes.len() != tx_count {
|
||||
return Err(PrunerError::InconsistentData(
|
||||
"Unexpected number of transaction hashes retrieved by transaction number range",
|
||||
))
|
||||
}
|
||||
|
||||
// Pre-sort hashes to prune them in order
|
||||
hashes.sort();
|
||||
|
||||
provider.prune_table_in_batches::<tables::TxHashNumber, _>(
|
||||
hashes,
|
||||
self.batch_sizes.transaction_lookup,
|
||||
|entries| {
|
||||
trace!(
|
||||
target: "pruner",
|
||||
%entries,
|
||||
"Pruned transaction lookup"
|
||||
);
|
||||
},
|
||||
)?;
|
||||
}
|
||||
|
||||
provider.save_prune_checkpoint(
|
||||
PrunePart::TransactionLookup,
|
||||
PruneCheckpoint { block_number: to_block, prune_mode },
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -143,7 +256,9 @@ mod tests {
|
||||
generators,
|
||||
generators::{random_block_range, random_receipt},
|
||||
};
|
||||
use reth_primitives::{PruneCheckpoint, PruneMode, PruneModes, PrunePart, H256, MAINNET};
|
||||
use reth_primitives::{
|
||||
BlockNumber, PruneCheckpoint, PruneMode, PruneModes, PrunePart, H256, MAINNET,
|
||||
};
|
||||
use reth_provider::PruneCheckpointReader;
|
||||
use reth_stages::test_utils::TestTransaction;
|
||||
|
||||
@@ -192,34 +307,103 @@ mod tests {
|
||||
tx.table::<tables::Receipts>().unwrap().len()
|
||||
);
|
||||
|
||||
let prune_to_block = 10;
|
||||
let prune_mode = PruneMode::Before(prune_to_block);
|
||||
let pruner = Pruner::new(
|
||||
tx.inner_raw(),
|
||||
MAINNET.clone(),
|
||||
5,
|
||||
0,
|
||||
PruneModes { receipts: Some(prune_mode), ..Default::default() },
|
||||
BatchSizes {
|
||||
// Less than total amount of blocks to prune to test the batching logic
|
||||
receipts: 10,
|
||||
},
|
||||
);
|
||||
let test_prune = |to_block: BlockNumber| {
|
||||
let prune_mode = PruneMode::Before(to_block);
|
||||
let pruner = Pruner::new(
|
||||
tx.inner_raw(),
|
||||
MAINNET.clone(),
|
||||
5,
|
||||
0,
|
||||
PruneModes { receipts: Some(prune_mode), ..Default::default() },
|
||||
BatchSizes {
|
||||
// Less than total amount of blocks to prune to test the batching logic
|
||||
receipts: 10,
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
|
||||
let provider = tx.inner_rw();
|
||||
assert_matches!(pruner.prune_receipts(&provider, prune_to_block, prune_mode), Ok(()));
|
||||
provider.commit().expect("commit");
|
||||
let provider = tx.inner_rw();
|
||||
assert_matches!(pruner.prune_receipts(&provider, to_block, prune_mode), Ok(()));
|
||||
provider.commit().expect("commit");
|
||||
|
||||
assert_eq!(
|
||||
tx.table::<tables::Receipts>().unwrap().len(),
|
||||
blocks[to_block as usize + 1..].iter().map(|block| block.body.len()).sum::<usize>()
|
||||
);
|
||||
assert_eq!(
|
||||
tx.inner().get_prune_checkpoint(PrunePart::Receipts).unwrap(),
|
||||
Some(PruneCheckpoint { block_number: to_block, prune_mode })
|
||||
);
|
||||
};
|
||||
|
||||
// Pruning first time ever, no previous checkpoint is present
|
||||
test_prune(10);
|
||||
// Prune second time, previous checkpoint is present, should continue pruning from where
|
||||
// ended last time
|
||||
test_prune(20);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn prune_transaction_lookup() {
|
||||
let tx = TestTransaction::default();
|
||||
let mut rng = generators::rng();
|
||||
|
||||
let blocks = random_block_range(&mut rng, 0..=100, H256::zero(), 0..10);
|
||||
tx.insert_blocks(blocks.iter(), None).expect("insert blocks");
|
||||
|
||||
let mut tx_hash_numbers = Vec::new();
|
||||
for block in &blocks {
|
||||
for transaction in &block.body {
|
||||
tx_hash_numbers.push((transaction.hash, tx_hash_numbers.len() as u64));
|
||||
}
|
||||
}
|
||||
tx.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
|
||||
|
||||
assert_eq!(
|
||||
tx.table::<tables::Receipts>().unwrap().len(),
|
||||
blocks[prune_to_block as usize + 1..]
|
||||
.iter()
|
||||
.map(|block| block.body.len())
|
||||
.sum::<usize>()
|
||||
tx.table::<tables::Transactions>().unwrap().len(),
|
||||
blocks.iter().map(|block| block.body.len()).sum::<usize>()
|
||||
);
|
||||
assert_eq!(
|
||||
tx.inner().get_prune_checkpoint(PrunePart::Receipts).unwrap(),
|
||||
Some(PruneCheckpoint { block_number: prune_to_block, prune_mode })
|
||||
tx.table::<tables::Transactions>().unwrap().len(),
|
||||
tx.table::<tables::TxHashNumber>().unwrap().len()
|
||||
);
|
||||
|
||||
let test_prune = |to_block: BlockNumber| {
|
||||
let prune_mode = PruneMode::Before(to_block);
|
||||
let pruner = Pruner::new(
|
||||
tx.inner_raw(),
|
||||
MAINNET.clone(),
|
||||
5,
|
||||
0,
|
||||
PruneModes { transaction_lookup: Some(prune_mode), ..Default::default() },
|
||||
BatchSizes {
|
||||
// Less than total amount of blocks to prune to test the batching logic
|
||||
transaction_lookup: 10,
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
|
||||
let provider = tx.inner_rw();
|
||||
assert_matches!(
|
||||
pruner.prune_transaction_lookup(&provider, to_block, prune_mode),
|
||||
Ok(())
|
||||
);
|
||||
provider.commit().expect("commit");
|
||||
|
||||
assert_eq!(
|
||||
tx.table::<tables::TxHashNumber>().unwrap().len(),
|
||||
blocks[to_block as usize + 1..].iter().map(|block| block.body.len()).sum::<usize>()
|
||||
);
|
||||
assert_eq!(
|
||||
tx.inner().get_prune_checkpoint(PrunePart::TransactionLookup).unwrap(),
|
||||
Some(PruneCheckpoint { block_number: to_block, prune_mode })
|
||||
);
|
||||
};
|
||||
|
||||
// Pruning first time ever, no previous checkpoint is present
|
||||
test_prune(10);
|
||||
// Prune second time, previous checkpoint is present, should continue pruning from where
|
||||
// ended last time
|
||||
test_prune(20);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ use reth_db::{
|
||||
};
|
||||
use reth_primitives::{
|
||||
keccak256, Account, Address, BlockNumber, Receipt, SealedBlock, SealedHeader, StorageEntry,
|
||||
TxNumber, H256, MAINNET, U256,
|
||||
TxHash, TxNumber, H256, MAINNET, U256,
|
||||
};
|
||||
use reth_provider::{DatabaseProviderRO, DatabaseProviderRW, ProviderFactory};
|
||||
use std::{
|
||||
@@ -268,6 +268,18 @@ impl TestTransaction {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn insert_tx_hash_numbers<I>(&self, tx_hash_numbers: I) -> Result<(), DbError>
|
||||
where
|
||||
I: IntoIterator<Item = (TxHash, TxNumber)>,
|
||||
{
|
||||
self.commit(|tx| {
|
||||
tx_hash_numbers.into_iter().try_for_each(|(tx_hash, tx_num)| {
|
||||
// Insert into tx hash numbers table.
|
||||
tx.put::<tables::TxHashNumber>(tx_hash, tx_num)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/// Insert collection of ([TxNumber], [Receipt]) into the corresponding table.
|
||||
pub fn insert_receipts<I>(&self, receipts: I) -> Result<(), DbError>
|
||||
where
|
||||
|
||||
@@ -619,40 +619,39 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Prune the table for the specified key range.
|
||||
/// Prune the table for the specified pre-sorted key iterator.
|
||||
/// Returns number of rows pruned.
|
||||
pub fn prune_table<T, K>(
|
||||
&self,
|
||||
range: impl RangeBounds<K>,
|
||||
keys: impl IntoIterator<Item = K>,
|
||||
) -> std::result::Result<usize, DatabaseError>
|
||||
where
|
||||
T: Table<Key = K>,
|
||||
K: Key,
|
||||
{
|
||||
self.prune_table_in_batches::<T, K, _>(range, usize::MAX, |_| {})
|
||||
self.prune_table_in_batches::<T, K>(keys, usize::MAX, |_| {})
|
||||
}
|
||||
|
||||
/// Prune the table for the specified key range calling `chunk_callback` after every
|
||||
/// `batch_size` pruned rows.
|
||||
/// Prune the table for the specified pre-sorted key iterator, calling `chunk_callback` after
|
||||
/// every `batch_size` pruned rows.
|
||||
///
|
||||
/// Returns number of rows pruned.
|
||||
pub fn prune_table_in_batches<T, K, F>(
|
||||
pub fn prune_table_in_batches<T, K>(
|
||||
&self,
|
||||
range: impl RangeBounds<K>,
|
||||
keys: impl IntoIterator<Item = K>,
|
||||
batch_size: usize,
|
||||
batch_callback: F,
|
||||
batch_callback: impl Fn(usize),
|
||||
) -> std::result::Result<usize, DatabaseError>
|
||||
where
|
||||
T: Table<Key = K>,
|
||||
K: Key,
|
||||
F: Fn(usize),
|
||||
{
|
||||
let mut cursor = self.tx.cursor_write::<T>()?;
|
||||
let mut walker = cursor.walk_range(range)?;
|
||||
let mut deleted = 0;
|
||||
|
||||
while let Some(Ok(_)) = walker.next() {
|
||||
walker.delete_current()?;
|
||||
for key in keys {
|
||||
cursor.seek_exact(key)?;
|
||||
cursor.delete_current()?;
|
||||
deleted += 1;
|
||||
|
||||
if deleted % batch_size == 0 {
|
||||
|
||||
Reference in New Issue
Block a user