feat(pruner): transaction senders (#3912)

This commit is contained in:
Alexey Shekhirin
2023-08-01 12:30:02 +01:00
committed by GitHub
parent 3a4419625a
commit 4688fd2ae0
4 changed files with 221 additions and 17 deletions

View File

@@ -10,7 +10,12 @@ use serde::{Deserialize, Serialize};
#[serde(default)]
pub struct PruneModes {
/// Sender Recovery pruning configuration.
#[serde(skip_serializing_if = "Option::is_none")]
// TODO(alexey): removing min blocks restriction is possible if we start calculating the senders
// dynamically on blockchain tree unwind.
#[serde(
skip_serializing_if = "Option::is_none",
deserialize_with = "deserialize_opt_prune_mode_with_min_blocks::<64, _>"
)]
pub sender_recovery: Option<PruneMode>,
/// Transaction Lookup pruning configuration.
#[serde(skip_serializing_if = "Option::is_none")]
@@ -126,7 +131,7 @@ impl PruneModes {
}
impl_prune_parts!(
(sender_recovery, "SenderRecovery", None),
(sender_recovery, "SenderRecovery", Some(64)),
(transaction_lookup, "TransactionLookup", None),
(receipts, "Receipts", Some(64)),
(account_history, "AccountHistory", Some(64)),

View File

@@ -22,11 +22,12 @@ pub type PrunerWithResult<DB> = (Pruner<DB>, PrunerResult);
pub struct BatchSizes {
receipts: usize,
transaction_lookup: usize,
transaction_senders: usize,
}
impl Default for BatchSizes {
fn default() -> Self {
Self { receipts: 10000, transaction_lookup: 10000 }
Self { receipts: 10000, transaction_lookup: 10000, transaction_senders: 10000 }
}
}
@@ -83,6 +84,12 @@ impl<DB: Database> Pruner<DB> {
self.prune_transaction_lookup(&provider, to_block, prune_mode)?;
}
if let Some((to_block, prune_mode)) =
self.modes.prune_target_block_sender_recovery(tip_block_number)?
{
self.prune_transaction_senders(&provider, to_block, prune_mode)?;
}
provider.commit()?;
self.last_pruned_block_number = Some(tip_block_number);
@@ -124,13 +131,16 @@ impl<DB: Database> Pruner<DB> {
prune_part: PrunePart,
to_block: BlockNumber,
) -> reth_interfaces::Result<Option<RangeInclusive<TxNumber>>> {
let from_tx_num = provider
.get_prune_checkpoint(prune_part)?
.map(|checkpoint| provider.block_body_indices(checkpoint.block_number + 1))
.transpose()?
.flatten()
.map(|body| body.first_tx_num)
.unwrap_or_default();
let checkpoint = provider.get_prune_checkpoint(prune_part)?.unwrap_or(PruneCheckpoint {
block_number: 0, // No checkpoint, fresh pruning
prune_mode: PruneMode::Full, // Doesn't matter in this case, can be anything
});
// Get first transaction of the next block after the highest pruned one
let from_tx_num =
provider.block_body_indices(checkpoint.block_number + 1)?.map(|body| body.first_tx_num);
// If no block body index is found, the DB is either corrupted or we've already pruned up to
// the latest block, so there's no thing to prune now.
let Some(from_tx_num) = from_tx_num else { return Ok(None) };
let to_tx_num = match provider.block_body_indices(to_block)? {
Some(body) => body,
@@ -200,7 +210,7 @@ impl<DB: Database> Pruner<DB> {
)? {
Some(range) => range,
None => {
trace!(target: "pruner", "No receipts to prune");
trace!(target: "pruner", "No transaction lookup entries to prune");
return Ok(())
}
};
@@ -248,6 +258,50 @@ impl<DB: Database> Pruner<DB> {
Ok(())
}
/// Prune transaction senders up to the provided block, inclusive.
#[instrument(level = "trace", skip(self, provider), target = "pruner")]
fn prune_transaction_senders(
&self,
provider: &DatabaseProviderRW<'_, DB>,
to_block: BlockNumber,
prune_mode: PruneMode,
) -> PrunerResult {
let range = match self.get_next_tx_num_range_from_checkpoint(
provider,
PrunePart::SenderRecovery,
to_block,
)? {
Some(range) => range,
None => {
trace!(target: "pruner", "No transaction senders to prune");
return Ok(())
}
};
let total = range.clone().count();
let mut processed = 0;
provider.prune_table_in_batches::<tables::TxSenders, _>(
range,
self.batch_sizes.transaction_senders,
|entries| {
processed += entries;
trace!(
target: "pruner",
%entries,
progress = format!("{:.1}%", 100.0 * processed as f64 / total as f64),
"Pruned transaction senders"
);
},
)?;
provider.save_prune_checkpoint(
PrunePart::SenderRecovery,
PruneCheckpoint { block_number: to_block, prune_mode },
)?;
Ok(())
}
}
#[cfg(test)]
@@ -409,4 +463,71 @@ mod tests {
// ended last time
test_prune(20);
}
#[test]
fn prune_transaction_senders() {
let tx = TestTransaction::default();
let mut rng = generators::rng();
let blocks = random_block_range(&mut rng, 0..=100, H256::zero(), 0..10);
tx.insert_blocks(blocks.iter(), None).expect("insert blocks");
let mut transaction_senders = Vec::new();
for block in &blocks {
for transaction in &block.body {
transaction_senders.push((
transaction_senders.len() as u64,
transaction.recover_signer().expect("recover signer"),
));
}
}
tx.insert_transaction_senders(transaction_senders).expect("insert transaction senders");
assert_eq!(
tx.table::<tables::Transactions>().unwrap().len(),
blocks.iter().map(|block| block.body.len()).sum::<usize>()
);
assert_eq!(
tx.table::<tables::Transactions>().unwrap().len(),
tx.table::<tables::TxSenders>().unwrap().len()
);
let test_prune = |to_block: BlockNumber| {
let prune_mode = PruneMode::Before(to_block);
let pruner = Pruner::new(
tx.inner_raw(),
MAINNET.clone(),
5,
0,
PruneModes { sender_recovery: Some(prune_mode), ..Default::default() },
BatchSizes {
// Less than total amount of blocks to prune to test the batching logic
transaction_senders: 10,
..Default::default()
},
);
let provider = tx.inner_rw();
assert_matches!(
pruner.prune_transaction_senders(&provider, to_block, prune_mode),
Ok(())
);
provider.commit().expect("commit");
assert_eq!(
tx.table::<tables::TxSenders>().unwrap().len(),
blocks[to_block as usize + 1..].iter().map(|block| block.body.len()).sum::<usize>()
);
assert_eq!(
tx.inner().get_prune_checkpoint(PrunePart::SenderRecovery).unwrap(),
Some(PruneCheckpoint { block_number: to_block, prune_mode })
);
};
// Pruning first time ever, no previous checkpoint is present
test_prune(10);
// Prune second time, previous checkpoint is present, should continue pruning from where
// ended last time
test_prune(20);
}
}

View File

@@ -11,9 +11,11 @@ use reth_interfaces::consensus;
use reth_primitives::{
keccak256,
stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
TransactionSignedNoHash, TxNumber, H160,
PrunePart, TransactionSignedNoHash, TxNumber, H160,
};
use reth_provider::{
BlockReader, DatabaseProviderRW, HeaderProvider, ProviderError, PruneCheckpointReader,
};
use reth_provider::{BlockReader, DatabaseProviderRW, HeaderProvider, ProviderError};
use std::fmt::Debug;
use thiserror::Error;
use tokio::sync::mpsc;
@@ -207,9 +209,20 @@ fn recover_sender(
fn stage_checkpoint<DB: Database>(
provider: &DatabaseProviderRW<'_, &DB>,
) -> Result<EntitiesCheckpoint, DatabaseError> {
) -> Result<EntitiesCheckpoint, StageError> {
let pruned_entries = provider
.get_prune_checkpoint(PrunePart::SenderRecovery)?
.map(|checkpoint| provider.block_body_indices(checkpoint.block_number))
.transpose()?
.flatten()
// +1 is needed because TxNumber is 0-indexed
.map(|body| body.last_tx_num() + 1)
.unwrap_or_default();
Ok(EntitiesCheckpoint {
processed: provider.tx_ref().entries::<tables::TxSenders>()? as u64,
// If `TxSenders` table was pruned, we will have a number of entries in it not matching
// the actual number of processed transactions. To fix that, we add the number of pruned
// `TxSenders` entries.
processed: provider.tx_ref().entries::<tables::TxSenders>()? as u64 + pruned_entries,
total: provider.tx_ref().entries::<tables::Transactions>()? as u64,
})
}
@@ -239,9 +252,10 @@ mod tests {
generators::{random_block, random_block_range},
};
use reth_primitives::{
stage::StageUnitCheckpoint, BlockNumber, SealedBlock, TransactionSigned, H256,
stage::StageUnitCheckpoint, BlockNumber, PruneCheckpoint, PruneMode, SealedBlock,
TransactionSigned, H256, MAINNET,
};
use reth_provider::TransactionsProvider;
use reth_provider::{ProviderFactory, PruneCheckpointWriter, TransactionsProvider};
use super::*;
use crate::test_utils::{
@@ -366,6 +380,58 @@ mod tests {
assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed");
}
#[test]
fn stage_checkpoint_pruned() {
let tx = TestTransaction::default();
let mut rng = generators::rng();
let blocks = random_block_range(&mut rng, 0..=100, H256::zero(), 0..10);
tx.insert_blocks(blocks.iter(), None).expect("insert blocks");
let max_pruned_block = 30;
let max_processed_block = 70;
let mut tx_senders = Vec::new();
let mut tx_number = 0;
for block in &blocks[..=max_processed_block] {
for transaction in &block.body {
if block.number > max_pruned_block {
tx_senders
.push((tx_number, transaction.recover_signer().expect("recover signer")));
}
tx_number += 1;
}
}
tx.insert_transaction_senders(tx_senders).expect("insert tx hash numbers");
let provider = tx.inner_rw();
provider
.save_prune_checkpoint(
PrunePart::SenderRecovery,
PruneCheckpoint {
block_number: max_pruned_block as BlockNumber,
prune_mode: PruneMode::Full,
},
)
.expect("save stage checkpoint");
provider.commit().expect("commit");
let db = tx.inner_raw();
let factory = ProviderFactory::new(db.as_ref(), MAINNET.clone());
let provider = factory.provider_rw().expect("provider rw");
assert_eq!(
stage_checkpoint(&provider).expect("stage checkpoint"),
EntitiesCheckpoint {
processed: blocks[..=max_processed_block]
.iter()
.map(|block| block.body.len() as u64)
.sum::<u64>(),
total: blocks.iter().map(|block| block.body.len() as u64).sum::<u64>()
}
);
}
struct SenderRecoveryTestRunner {
tx: TestTransaction,
threshold: u64,

View File

@@ -293,6 +293,18 @@ impl TestTransaction {
})
}
pub fn insert_transaction_senders<I>(&self, transaction_senders: I) -> Result<(), DbError>
where
I: IntoIterator<Item = (TxNumber, Address)>,
{
self.commit(|tx| {
transaction_senders.into_iter().try_for_each(|(tx_num, sender)| {
// Insert into receipts table.
tx.put::<tables::TxSenders>(tx_num, sender)
})
})
}
/// Insert collection of ([Address], [Account]) into corresponding tables.
pub fn insert_accounts_and_storages<I, S>(&self, accounts: I) -> Result<(), DbError>
where