feat(pipeline): prune receipts based on log emitters (#4044)

This commit is contained in:
joshieDo
2023-08-16 17:38:36 +01:00
committed by GitHub
parent 6edbc0eeaf
commit 8a2c3abd2a
12 changed files with 247 additions and 54 deletions

View File

@@ -279,7 +279,7 @@ impl Default for IndexHistoryConfig {
}
/// Pruning configuration.
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)]
#[serde(default)]
pub struct PruneConfig {
/// Minimum pruning interval measured in blocks.

View File

@@ -79,7 +79,10 @@ pub use net::{
SEPOLIA_BOOTNODES,
};
pub use peer::{PeerId, WithPeerId};
pub use prune::{PruneCheckpoint, PruneMode, PruneModes, PrunePart, PrunePartError};
pub use prune::{
ContractLogsPruneConfig, PruneCheckpoint, PruneMode, PruneModes, PrunePart, PrunePartError,
MINIMUM_PRUNING_DISTANCE,
};
pub use receipt::{Receipt, ReceiptWithBloom, ReceiptWithBloomRef};
pub use revm_primitives::JumpMap;
pub use serde_helper::JsonU256;

View File

@@ -3,7 +3,45 @@ mod mode;
mod part;
mod target;
use crate::{Address, BlockNumber};
pub use checkpoint::PruneCheckpoint;
pub use mode::PruneMode;
pub use part::{PrunePart, PrunePartError};
pub use target::PruneModes;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
pub use target::{PruneModes, MINIMUM_PRUNING_DISTANCE};
/// Configuration for pruning receipts not associated with logs emitted by the specified contracts.
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
pub struct ContractLogsPruneConfig(pub BTreeMap<Address, PruneMode>);
impl ContractLogsPruneConfig {
/// Checks if the configuration is empty
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
/// Given the `tip` block number, consolidates the structure so it can easily be queried for
/// filtering across a range of blocks.
///
/// The [`BlockNumber`] key of the map should be viewed as `PruneMode::Before(block)`.
pub fn group_by_block(
&self,
tip: BlockNumber,
) -> Result<BTreeMap<BlockNumber, Vec<&Address>>, PrunePartError> {
let mut map = BTreeMap::new();
for (address, mode) in self.0.iter() {
// Getting `None`, means that there is nothing to prune yet, so we need it to include in
// the BTreeMap (block = 0), otherwise it will be excluded.
// Reminder that this BTreeMap works as an inclusion list that excludes (prunes) all
// other receipts.
let block = mode
.prune_target_block(tip, MINIMUM_PRUNING_DISTANCE, PrunePart::ContractLogs)?
.map(|(block, _)| block)
.unwrap_or_default();
map.entry(block).or_insert_with(Vec::new).push(address)
}
Ok(map)
}
}

View File

@@ -1,4 +1,4 @@
use crate::BlockNumber;
use crate::{BlockNumber, PrunePart, PrunePartError};
use reth_codecs::{main_codec, Compact};
/// Prune mode.
@@ -14,6 +14,43 @@ pub enum PruneMode {
Before(BlockNumber),
}
impl PruneMode {
/// Returns block up to which variant pruning needs to be done, inclusive, according to the
/// provided tip.
pub fn prune_target_block(
&self,
tip: BlockNumber,
min_blocks: u64,
prune_part: PrunePart,
) -> Result<Option<(BlockNumber, PruneMode)>, PrunePartError> {
let result = match self {
PruneMode::Full if min_blocks == 0 => Some((tip, *self)),
PruneMode::Distance(distance) if *distance > tip => None, // Nothing to prune yet
PruneMode::Distance(distance) if *distance >= min_blocks => {
Some((tip - distance, *self))
}
PruneMode::Before(n) if *n > tip => None, // Nothing to prune yet
PruneMode::Before(n) if tip - n >= min_blocks => Some((n - 1, *self)),
_ => return Err(PrunePartError::Configuration(prune_part)),
};
Ok(result)
}
/// Check if target block should be pruned according to the provided prune mode and tip.
pub fn should_prune(&self, block: BlockNumber, tip: BlockNumber) -> bool {
match self {
PruneMode::Full => true,
PruneMode::Distance(distance) => {
if *distance > tip {
return false
}
block < tip - *distance
}
PruneMode::Before(n) => *n > block,
}
}
}
#[cfg(test)]
impl Default for PruneMode {
fn default() -> Self {
@@ -23,10 +60,78 @@ impl Default for PruneMode {
#[cfg(test)]
mod tests {
use crate::prune::PruneMode;
use crate::{prune::PruneMode, PrunePart, PrunePartError, MINIMUM_PRUNING_DISTANCE};
use assert_matches::assert_matches;
use serde::Deserialize;
#[test]
fn test_prune_target_block() {
let tip = 1000;
let min_blocks = MINIMUM_PRUNING_DISTANCE;
let prune_part = PrunePart::Receipts;
let tests = vec![
// MINIMUM_PRUNING_DISTANCE makes this impossible
(PruneMode::Full, Err(PrunePartError::Configuration(prune_part))),
// Nothing to prune
(PruneMode::Distance(tip + 1), Ok(None)),
(PruneMode::Distance(min_blocks + 1), Ok(Some(tip - (min_blocks + 1)))),
// Nothing to prune
(PruneMode::Before(tip + 1), Ok(None)),
(
PruneMode::Before(tip - MINIMUM_PRUNING_DISTANCE),
Ok(Some(tip - MINIMUM_PRUNING_DISTANCE - 1)),
),
(
PruneMode::Before(tip - MINIMUM_PRUNING_DISTANCE - 1),
Ok(Some(tip - MINIMUM_PRUNING_DISTANCE - 2)),
),
// MINIMUM_PRUNING_DISTANCE is 128
(PruneMode::Before(tip - 1), Err(PrunePartError::Configuration(prune_part))),
];
for (index, (mode, expected_result)) in tests.into_iter().enumerate() {
assert_eq!(
mode.prune_target_block(tip, min_blocks, prune_part),
expected_result.map(|r| r.map(|b| (b, mode))),
"Test {} failed",
index + 1,
);
}
// Test for a scenario where there are no minimum blocks and Full can be used
assert_eq!(
PruneMode::Full.prune_target_block(tip, 0, prune_part),
Ok(Some((tip, PruneMode::Full))),
);
}
#[test]
fn test_should_prune() {
let tip = 1000;
let should_prune = true;
let tests = vec![
(PruneMode::Distance(tip + 1), 1, !should_prune),
(
PruneMode::Distance(MINIMUM_PRUNING_DISTANCE + 1),
tip - MINIMUM_PRUNING_DISTANCE - 1,
!should_prune,
),
(
PruneMode::Distance(MINIMUM_PRUNING_DISTANCE + 1),
tip - MINIMUM_PRUNING_DISTANCE - 2,
should_prune,
),
(PruneMode::Before(tip + 1), 1, should_prune),
(PruneMode::Before(tip + 1), tip + 1, !should_prune),
];
for (index, (mode, block, expected_result)) in tests.into_iter().enumerate() {
assert_eq!(mode.should_prune(block, tip), expected_result, "Test {} failed", index + 1,);
}
}
#[test]
fn prune_mode_deserialize() {
#[derive(Debug, Deserialize)]

View File

@@ -10,8 +10,10 @@ pub enum PrunePart {
SenderRecovery,
/// Prune part responsible for the `TxHashNumber` table.
TransactionLookup,
/// Prune part responsible for the `Receipts` table.
/// Prune part responsible for all `Receipts`.
Receipts,
/// Prune part responsible for some `Receipts` filtered by logs.
ContractLogs,
/// Prune part responsible for the `AccountChangeSet` and `AccountHistory` tables.
AccountHistory,
/// Prune part responsible for the `StorageChangeSet` and `StorageHistory` tables.
@@ -19,7 +21,7 @@ pub enum PrunePart {
}
/// PrunePart error type.
#[derive(Debug, Error)]
#[derive(Debug, Error, PartialEq, Eq)]
pub enum PrunePartError {
/// Invalid configuration of a prune part.
#[error("The configuration provided for {0} is invalid.")]

View File

@@ -1,12 +1,15 @@
use crate::{
prune::PrunePartError, serde_helper::deserialize_opt_prune_mode_with_min_blocks, BlockNumber,
PruneMode, PrunePart,
ContractLogsPruneConfig, PruneMode, PrunePart,
};
use paste::paste;
use serde::{Deserialize, Serialize};
/// Minimum distance necessary from the tip so blockchain tree can work correctly.
pub const MINIMUM_PRUNING_DISTANCE: u64 = 128;
/// Pruning configuration for every part of the data that can be pruned.
#[derive(Debug, Clone, Default, Copy, Deserialize, Eq, PartialEq, Serialize)]
#[derive(Debug, Clone, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(default)]
pub struct PruneModes {
/// Sender Recovery pruning configuration.
@@ -20,7 +23,8 @@ pub struct PruneModes {
/// Transaction Lookup pruning configuration.
#[serde(skip_serializing_if = "Option::is_none")]
pub transaction_lookup: Option<PruneMode>,
/// Receipts pruning configuration.
/// Configuration for pruning of receipts. This setting overrides
/// `PruneModes::contract_logs_filter` and offers improved performance.
#[serde(
skip_serializing_if = "Option::is_none",
deserialize_with = "deserialize_opt_prune_mode_with_min_blocks::<64, _>"
@@ -38,6 +42,12 @@ pub struct PruneModes {
deserialize_with = "deserialize_opt_prune_mode_with_min_blocks::<64, _>"
)]
pub storage_history: Option<PruneMode>,
/// Retains only those receipts that contain logs emitted by the specified addresses,
/// discarding all others. Note that this setting is overridden by `PruneModes::receipts`.
///
/// The [`BlockNumber`] represents the starting block from which point onwards the receipts are
/// preserved.
pub contract_logs_filter: ContractLogsPruneConfig,
}
macro_rules! impl_prune_parts {
@@ -51,7 +61,7 @@ macro_rules! impl_prune_parts {
)]
pub fn [<should_prune_ $part>](&self, block: BlockNumber, tip: BlockNumber) -> bool {
if let Some(mode) = &self.$part {
return self.should_prune(mode, block, tip)
return mode.should_prune(block, tip)
}
false
}
@@ -66,16 +76,8 @@ macro_rules! impl_prune_parts {
" pruning needs to be done, inclusive, according to the provided tip."
)]
pub fn [<prune_target_block_ $part>](&self, tip: BlockNumber) -> Result<Option<(BlockNumber, PruneMode)>, PrunePartError> {
let min_blocks: u64 = $min_blocks.unwrap_or_default();
match self.$part {
Some(mode) => Ok(match mode {
PruneMode::Full if min_blocks == 0 => Some((tip, mode)),
PruneMode::Distance(distance) if distance > tip => None, // Nothing to prune yet
PruneMode::Distance(distance) if distance >= min_blocks => Some((tip - distance, mode)),
PruneMode::Before(n) if n > tip => None, // Nothing to prune yet
PruneMode::Before(n) if tip - n >= min_blocks => Some((n - 1, mode)),
_ => return Err(PrunePartError::Configuration(PrunePart::$variant)),
}),
match self.$part {
Some(mode) => mode.prune_target_block(tip, $min_blocks.unwrap_or_default(), PrunePart::$variant),
None => Ok(None)
}
}
@@ -88,6 +90,7 @@ macro_rules! impl_prune_parts {
$(
$part: Some(PruneMode::Full),
)+
contract_logs_filter: Default::default()
}
}
@@ -100,20 +103,6 @@ impl PruneModes {
PruneModes::default()
}
/// Check if target block should be pruned according to the provided prune mode and tip.
pub fn should_prune(&self, mode: &PruneMode, block: BlockNumber, tip: BlockNumber) -> bool {
match mode {
PruneMode::Full => true,
PruneMode::Distance(distance) => {
if *distance > tip {
return false
}
block < tip - *distance
}
PruneMode::Before(n) => *n > block,
}
}
impl_prune_parts!(
(sender_recovery, SenderRecovery, Some(64)),
(transaction_lookup, TransactionLookup, None),

View File

@@ -199,7 +199,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
start_block: u64,
max_block: u64,
) -> Result<PruneModes, StageError> {
let mut prune_modes = self.prune_modes;
let mut prune_modes = self.prune_modes.clone();
// If we're not executing MerkleStage from scratch (by threshold or first-sync), then erase
// changeset related pruning configurations

View File

@@ -173,34 +173,34 @@ mod tests {
// In an unpruned configuration there is 1 receipt, 3 changed accounts and 1 changed
// storage.
let mut prune = PruneModes::none();
check_pruning(factory.clone(), prune, 1, 3, 1).await;
check_pruning(factory.clone(), prune.clone(), 1, 3, 1).await;
prune.receipts = Some(PruneMode::Full);
prune.account_history = Some(PruneMode::Full);
prune.storage_history = Some(PruneMode::Full);
// This will result in error for account_history and storage_history, which is caught.
check_pruning(factory.clone(), prune, 0, 0, 0).await;
check_pruning(factory.clone(), prune.clone(), 0, 0, 0).await;
prune.receipts = Some(PruneMode::Before(1));
prune.account_history = Some(PruneMode::Before(1));
prune.storage_history = Some(PruneMode::Before(1));
check_pruning(factory.clone(), prune, 1, 3, 1).await;
check_pruning(factory.clone(), prune.clone(), 1, 3, 1).await;
prune.receipts = Some(PruneMode::Before(2));
prune.account_history = Some(PruneMode::Before(2));
prune.storage_history = Some(PruneMode::Before(2));
// The one account is the miner
check_pruning(factory.clone(), prune, 0, 1, 0).await;
check_pruning(factory.clone(), prune.clone(), 0, 1, 0).await;
prune.receipts = Some(PruneMode::Distance(66));
prune.account_history = Some(PruneMode::Distance(66));
prune.storage_history = Some(PruneMode::Distance(66));
check_pruning(factory.clone(), prune, 1, 3, 1).await;
check_pruning(factory.clone(), prune.clone(), 1, 3, 1).await;
prune.receipts = Some(PruneMode::Distance(64));
prune.account_history = Some(PruneMode::Distance(64));
prune.storage_history = Some(PruneMode::Distance(64));
// The one account is the miner
check_pruning(factory.clone(), prune, 0, 1, 0).await;
check_pruning(factory.clone(), prune.clone(), 0, 1, 0).await;
}
}

View File

@@ -6,9 +6,11 @@ use reth_db::{
transaction::{DbTx, DbTxMut},
DatabaseError as DbError,
};
use reth_interfaces::Error;
use reth_primitives::{
bloom::logs_bloom, keccak256, proofs::calculate_receipt_root_ref, Account, Address,
BlockNumber, Bloom, Bytecode, Log, PruneMode, PruneModes, Receipt, StorageEntry, H256, U256,
BlockNumber, Bloom, Bytecode, Log, PruneMode, PruneModes, Receipt, StorageEntry, H256,
MINIMUM_PRUNING_DISTANCE, U256,
};
use reth_trie::{
hashed_cursor::{HashedPostState, HashedPostStateCursorFactory, HashedStorage},
@@ -600,7 +602,7 @@ impl PostState {
mut self,
tx: &TX,
tip: BlockNumber,
) -> Result<(), DbError> {
) -> Result<(), Error> {
self.write_history_to_db(tx, tip)?;
// Write new storage state
@@ -657,21 +659,64 @@ impl PostState {
let mut bodies_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
let mut receipts_cursor = tx.cursor_write::<tables::Receipts>()?;
let contract_log_pruner = self
.prune_modes
.contract_logs_filter
.group_by_block(tip)
.map_err(|e| Error::Custom(e.to_string()))?;
// Empty implies that there is going to be
// addresses to include in the filter in a future block. None means there isn't any kind
// of configuration.
let mut address_filter: Option<(u64, Vec<&Address>)> = None;
for (block, receipts) in self.receipts {
if self.prune_modes.should_prune_receipts(block, tip) {
// [`PrunePart::Receipts`] takes priority over [`PrunePart::ContractLogs`]
if receipts.is_empty() || self.prune_modes.should_prune_receipts(block, tip) {
continue
}
// All receipts from the last 128 blocks are required for blockchain tree, even with
// [`PrunePart::ContractLogs`].
let prunable_receipts =
PruneMode::Distance(MINIMUM_PRUNING_DISTANCE).should_prune(block, tip);
if prunable_receipts && !contract_log_pruner.is_empty() {
if address_filter.is_none() {
address_filter = Some((0, vec![]));
}
// Get all addresses higher than the previous checked block up to the current
// one
if let Some((prev_block, filter)) = &mut address_filter {
for (_, addresses) in contract_log_pruner.range(*prev_block..=block) {
filter.extend_from_slice(addresses.as_slice())
}
*prev_block = block;
}
}
let (_, body_indices) =
bodies_cursor.seek_exact(block)?.expect("body indices exist");
let tx_range = body_indices.tx_num_range();
assert_eq!(receipts.len(), tx_range.clone().count(), "Receipt length mismatch");
for (tx_num, receipt) in tx_range.zip(receipts) {
if prunable_receipts {
// If there is an address_filter, and it does not contain any of the
// contract addresses, then skip writing this
// receipt.
if let Some((_, filter)) = &address_filter {
if !receipt.logs.iter().any(|log| filter.contains(&&log.address)) {
continue
}
}
}
receipts_cursor.append(tx_num, receipt)?;
}
}
}
Ok(())
}
}