feat: prune pre merge transaction files (#16702)

This commit is contained in:
Matthias Seitz
2025-06-24 14:41:22 +02:00
committed by GitHub
parent b8e3f673dd
commit f5680e74d5
12 changed files with 257 additions and 31 deletions

View File

@@ -474,7 +474,7 @@ alloy-sol-macro = "1.2.0"
alloy-sol-types = { version = "1.2.0", default-features = false }
alloy-trie = { version = "0.9.0", default-features = false }
alloy-hardforks = "0.2.2"
alloy-hardforks = "0.2.7"
alloy-consensus = { version = "1.0.12", default-features = false }
alloy-contract = { version = "1.0.12", default-features = false }

View File

@@ -721,6 +721,15 @@ Pruning:
--prune.storagehistory.before <BLOCK_NUMBER>
Prune storage history before the specified block number. The specified block number is not pruned
--prune.bodies.pre-merge
Prune bodies before the merge block
--prune.bodies.distance <BLOCKS>
Prune bodies before the `head-N` block number. In other words, keep last N + 1 blocks
--prune.bodies.before <BLOCK_NUMBER>
Prune storage history before the specified block number. The specified block number is not pruned
Engine:
--engine.persistence-threshold <PERSISTENCE_THRESHOLD>
Configure persistence threshold for engine experimental

View File

@@ -463,6 +463,7 @@ impl PruneConfig {
receipts,
account_history,
storage_history,
bodies_history,
receipts_log_filter,
},
} = other;
@@ -478,6 +479,7 @@ impl PruneConfig {
self.segments.receipts = self.segments.receipts.or(receipts);
self.segments.account_history = self.segments.account_history.or(account_history);
self.segments.storage_history = self.segments.storage_history.or(storage_history);
self.segments.bodies_history = self.segments.bodies_history.or(bodies_history);
if self.segments.receipts_log_filter.0.is_empty() && !receipts_log_filter.0.is_empty() {
self.segments.receipts_log_filter = receipts_log_filter;
@@ -998,6 +1000,7 @@ receipts = 'full'
receipts: Some(PruneMode::Distance(1000)),
account_history: None,
storage_history: Some(PruneMode::Before(5000)),
bodies_history: None,
receipts_log_filter: ReceiptsLogPruneConfig(BTreeMap::from([(
Address::random(),
PruneMode::Full,
@@ -1013,6 +1016,7 @@ receipts = 'full'
receipts: Some(PruneMode::Full),
account_history: Some(PruneMode::Distance(2000)),
storage_history: Some(PruneMode::Distance(3000)),
bodies_history: None,
receipts_log_filter: ReceiptsLogPruneConfig(BTreeMap::from([
(Address::random(), PruneMode::Distance(1000)),
(Address::random(), PruneMode::Before(2000)),

View File

@@ -5,11 +5,12 @@ use crate::{
hooks::OnComponentInitializedHook,
BuilderContext, NodeAdapter,
};
use alloy_consensus::BlockHeader as _;
use alloy_eips::eip2124::Head;
use alloy_primitives::{BlockNumber, B256};
use eyre::{Context, OptionExt};
use rayon::ThreadPoolBuilder;
use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
use reth_chainspec::{Chain, EthChainSpec, EthereumHardfork, EthereumHardforks};
use reth_config::{config::EtlConfig, PruneConfig};
use reth_consensus::noop::NoopConsensus;
use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
@@ -41,8 +42,9 @@ use reth_node_metrics::{
};
use reth_provider::{
providers::{NodeTypesForProvider, ProviderNodeTypes, StaticFileProvider},
BlockHashReader, BlockNumReader, ChainSpecProvider, ProviderError, ProviderFactory,
ProviderResult, StageCheckpointReader, StateProviderFactory, StaticFileProviderFactory,
BlockHashReader, BlockNumReader, BlockReaderIdExt, ChainSpecProvider, ProviderError,
ProviderFactory, ProviderResult, StageCheckpointReader, StateProviderFactory,
StaticFileProviderFactory,
};
use reth_prune::{PruneModes, PrunerBuilder};
use reth_rpc_api::clients::EthApiClient;
@@ -85,10 +87,13 @@ impl LaunchContext {
/// `config`.
///
/// Attaches both the `NodeConfig` and the loaded `reth.toml` config to the launch context.
pub fn with_loaded_toml_config<ChainSpec: EthChainSpec>(
pub fn with_loaded_toml_config<ChainSpec>(
self,
config: NodeConfig<ChainSpec>,
) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>> {
) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>>
where
ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
{
let toml_config = self.load_toml_config(&config)?;
Ok(self.with(WithConfigs { config, toml_config }))
}
@@ -97,10 +102,13 @@ impl LaunchContext {
/// `config`.
///
/// This is async because the trusted peers may have to be resolved.
pub fn load_toml_config<ChainSpec: EthChainSpec>(
pub fn load_toml_config<ChainSpec>(
&self,
config: &NodeConfig<ChainSpec>,
) -> eyre::Result<reth_config::Config> {
) -> eyre::Result<reth_config::Config>
where
ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
{
let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
let mut toml_config = reth_config::Config::from_path(&config_path)
@@ -117,11 +125,14 @@ impl LaunchContext {
}
/// Save prune config to the toml file if node is a full node.
fn save_pruning_config_if_full_node<ChainSpec: EthChainSpec>(
fn save_pruning_config_if_full_node<ChainSpec>(
reth_config: &mut reth_config::Config,
config: &NodeConfig<ChainSpec>,
config_path: impl AsRef<std::path::Path>,
) -> eyre::Result<()> {
) -> eyre::Result<()>
where
ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
{
if reth_config.prune.is_none() {
if let Some(prune_config) = config.prune_config() {
reth_config.update_prune_config(prune_config);
@@ -340,7 +351,10 @@ impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpe
/// Returns the configured [`PruneConfig`]
///
/// Any configuration set in CLI will take precedence over those set in toml
pub fn prune_config(&self) -> Option<PruneConfig> {
pub fn prune_config(&self) -> Option<PruneConfig>
where
ChainSpec: reth_chainspec::EthereumHardforks,
{
let Some(mut node_prune_config) = self.node_config().prune_config() else {
// No CLI config is set, use the toml config.
return self.toml_config().prune.clone();
@@ -352,12 +366,18 @@ impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpe
}
/// Returns the configured [`PruneModes`], returning the default if no config was available.
pub fn prune_modes(&self) -> PruneModes {
pub fn prune_modes(&self) -> PruneModes
where
ChainSpec: reth_chainspec::EthereumHardforks,
{
self.prune_config().map(|config| config.segments).unwrap_or_default()
}
/// Returns an initialized [`PrunerBuilder`] based on the configured [`PruneConfig`]
pub fn pruner_builder(&self) -> PrunerBuilder {
pub fn pruner_builder(&self) -> PrunerBuilder
where
ChainSpec: reth_chainspec::EthereumHardforks,
{
PrunerBuilder::new(self.prune_config().unwrap_or_default())
.delete_limit(self.chain_spec().prune_delete_limit())
.timeout(PrunerBuilder::DEFAULT_TIMEOUT)
@@ -873,6 +893,36 @@ where
Ok(None)
}
/// Expire the pre-merge transactions if the node is configured to do so and the chain has a
/// merge block.
///
/// If the node is configured to prune pre-merge transactions and it has synced past the merge
/// block, it will delete the pre-merge transaction static files if they still exist.
pub fn expire_pre_merge_transactions(&self) -> eyre::Result<()>
where
T: FullNodeTypes<Provider: StaticFileProviderFactory>,
{
if self.node_config().pruning.bodies_pre_merge {
if let Some(merge_block) =
self.chain_spec().ethereum_fork_activation(EthereumHardfork::Paris).block_number()
{
// Ensure we only expire transactions after we synced past the merge block.
let Some(latest) = self.blockchain_db().latest_header()? else { return Ok(()) };
if latest.number() > merge_block {
let provider = self.blockchain_db().static_file_provider();
if provider.get_lowest_transaction_static_file_block() < Some(merge_block) {
info!(target: "reth::cli", merge_block, "Expiring pre-merge transactions");
provider.delete_transactions_below(merge_block)?;
} else {
debug!(target: "reth::cli", merge_block, "No pre-merge transactions to expire");
}
}
}
}
Ok(())
}
/// Returns the metrics sender.
pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
self.right().db_provider_container.metrics_sender.clone()
@@ -1095,7 +1145,10 @@ mod tests {
storage_history_full: false,
storage_history_distance: None,
storage_history_before: None,
bodies_pre_merge: false,
bodies_distance: None,
receipts_log_filter: None,
bodies_before: None,
},
..NodeConfig::test()
};

View File

@@ -130,6 +130,9 @@ where
})?
.with_components(components_builder, on_component_initialized).await?;
// Try to expire pre-merge transaction history if configured
ctx.expire_pre_merge_transactions()?;
// spawn exexs
let exex_manager_handle = ExExLauncher::new(
ctx.head(),

View File

@@ -1,8 +1,9 @@
//! Pruning and full node arguments
use crate::args::error::ReceiptsLogError;
use crate::{args::error::ReceiptsLogError, primitives::EthereumHardfork};
use alloy_primitives::{Address, BlockNumber};
use clap::{builder::RangedU64ValueParser, Args};
use reth_chainspec::EthereumHardforks;
use reth_config::config::PruneConfig;
use reth_prune_types::{PruneMode, PruneModes, ReceiptsLogPruneConfig, MINIMUM_PRUNING_DISTANCE};
use std::collections::BTreeMap;
@@ -86,11 +87,27 @@ pub struct PruningArgs {
/// pruned.
#[arg(long = "prune.storagehistory.before", value_name = "BLOCK_NUMBER", conflicts_with_all = &["storage_history_full", "storage_history_distance"])]
pub storage_history_before: Option<BlockNumber>,
// Bodies
/// Prune bodies before the merge block.
#[arg(long = "prune.bodies.pre-merge", value_name = "BLOCKS", conflicts_with_all = &["bodies_distance", "bodies_before"])]
pub bodies_pre_merge: bool,
/// Prune bodies before the `head-N` block number. In other words, keep last N + 1
/// blocks.
#[arg(long = "prune.bodies.distance", value_name = "BLOCKS", conflicts_with_all = &["bodies_pre_merge", "bodies_before"])]
pub bodies_distance: Option<u64>,
/// Prune storage history before the specified block number. The specified block number is not
/// pruned.
#[arg(long = "prune.bodies.before", value_name = "BLOCK_NUMBER", conflicts_with_all = &["bodies_distance", "bodies_pre_merge"])]
pub bodies_before: Option<BlockNumber>,
}
impl PruningArgs {
/// Returns pruning configuration.
pub fn prune_config(&self) -> Option<PruneConfig> {
pub fn prune_config<ChainSpec>(&self, chain_spec: &ChainSpec) -> Option<PruneConfig>
where
ChainSpec: EthereumHardforks,
{
// Initialise with a default prune configuration.
let mut config = PruneConfig::default();
@@ -104,6 +121,8 @@ impl PruningArgs {
receipts: Some(PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)),
account_history: Some(PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)),
storage_history: Some(PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)),
// TODO: set default to pre-merge block if available
bodies_history: None,
receipts_log_filter: Default::default(),
},
}
@@ -125,6 +144,9 @@ impl PruningArgs {
if let Some(mode) = self.account_history_prune_mode() {
config.segments.account_history = Some(mode);
}
if let Some(mode) = self.bodies_prune_mode(chain_spec) {
config.segments.bodies_history = Some(mode);
}
if let Some(mode) = self.storage_history_prune_mode() {
config.segments.storage_history = Some(mode);
}
@@ -140,6 +162,22 @@ impl PruningArgs {
Some(config)
}
fn bodies_prune_mode<ChainSpec>(&self, chain_spec: &ChainSpec) -> Option<PruneMode>
where
ChainSpec: EthereumHardforks,
{
if self.bodies_pre_merge {
chain_spec
.ethereum_fork_activation(EthereumHardfork::Paris)
.block_number()
.map(PruneMode::Before)
} else if let Some(distance) = self.bodies_distance {
Some(PruneMode::Distance(distance))
} else {
self.bodies_before.map(PruneMode::Before)
}
}
const fn sender_recovery_prune_mode(&self) -> Option<PruneMode> {
if self.sender_recovery_full {
Some(PruneMode::Full)

View File

@@ -14,7 +14,7 @@ use alloy_primitives::{BlockNumber, B256};
use eyre::eyre;
use reth_chainspec::{ChainSpec, EthChainSpec, MAINNET};
use reth_config::config::PruneConfig;
use reth_ethereum_forks::Head;
use reth_ethereum_forks::{EthereumHardforks, Head};
use reth_network_p2p::headers::client::HeadersClient;
use reth_primitives_traits::SealedHeader;
use reth_stages_types::StageId;
@@ -288,8 +288,11 @@ impl<ChainSpec> NodeConfig<ChainSpec> {
}
/// Returns pruning configuration.
pub fn prune_config(&self) -> Option<PruneConfig> {
self.pruning.prune_config()
pub fn prune_config(&self) -> Option<PruneConfig>
where
ChainSpec: EthereumHardforks,
{
self.pruning.prune_config(&self.chain)
}
/// Returns the max block that the node should run to, looking it up from the network if

View File

@@ -64,6 +64,7 @@ where
receipts,
account_history,
storage_history,
bodies_history: _,
receipts_log_filter,
} = prune_modes;

View File

@@ -15,6 +15,7 @@ use reth_prune_types::{
use reth_static_file_types::StaticFileSegment;
use tracing::trace;
/// The type responsible for pruning transactions in the database and history expiry.
#[derive(Debug)]
pub struct Transactions<N> {
static_file_provider: StaticFileProvider<N>,

View File

@@ -75,6 +75,15 @@ pub struct PruneModes {
)
)]
pub storage_history: Option<PruneMode>,
/// Bodies History pruning configuration.
#[cfg_attr(
any(test, feature = "serde"),
serde(
skip_serializing_if = "Option::is_none",
deserialize_with = "deserialize_opt_prune_mode_with_min_blocks::<MINIMUM_PRUNING_DISTANCE, _>"
)
)]
pub bodies_history: Option<PruneMode>,
/// Receipts pruning configuration by retaining only those receipts that contain logs emitted
/// by the specified addresses, discarding others. This setting is overridden by `receipts`.
///
@@ -97,6 +106,7 @@ impl PruneModes {
receipts: Some(PruneMode::Full),
account_history: Some(PruneMode::Full),
storage_history: Some(PruneMode::Full),
bodies_history: Some(PruneMode::Full),
receipts_log_filter: Default::default(),
}
}

View File

@@ -240,6 +240,7 @@ impl<H: NippyJarHeader> NippyJar<H> {
[self.data_path().into(), self.index_path(), self.offsets_path(), self.config_path()]
{
if path.exists() {
debug!(target: "nippy-jar", ?path, "Removing file.");
reth_fs_util::remove_file(path)?;
}
}

View File

@@ -50,9 +50,9 @@ use std::{
marker::PhantomData,
ops::{Deref, Range, RangeBounds, RangeInclusive},
path::{Path, PathBuf},
sync::{mpsc, Arc},
sync::{atomic::AtomicU64, mpsc, Arc},
};
use tracing::{info, trace, warn};
use tracing::{debug, info, trace, warn};
/// Alias type for a map that can be queried for block ranges from a transaction
/// segment respectively. It uses `TxNumber` to represent the transaction end of a static file
@@ -229,6 +229,26 @@ pub struct StaticFileProviderInner<N> {
/// Maintains a map which allows for concurrent access to different `NippyJars`, over different
/// segments and ranges.
map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
/// Min static file block for each segment.
/// This index is initialized on launch to keep track of the lowest, non-expired static files.
///
/// This tracks the lowest static file per segment together with the highest block in that
/// file. E.g. static file is batched in 500k block intervals then the lowest static file
/// is [0..499K], and the highest block is 499K.
/// This index is mainly used to History expiry, which targets transactions, e.g. pre-merge
/// history expiry would lead to removing all static files below the merge height.
static_files_min_block: RwLock<HashMap<StaticFileSegment, u64>>,
/// This is an additional index that tracks the expired height, this will track the highest
/// block number that has been expired (missing). The first, non expired block is
/// `expired_history_height + 1`.
///
/// This is effecitvely the transaction range that has been expired:
/// [`StaticFileProvider::delete_transactions_below`] and mirrors
/// `static_files_min_block[transactions] - blocks_per_file`.
///
/// This additional tracker exists for more efficient lookups because the node must be aware of
/// the expired height.
expired_history_height: AtomicU64,
/// Max static file block for each segment
static_files_max_block: RwLock<HashMap<StaticFileSegment, u64>>,
/// Available static file block ranges on disk indexed by max transactions.
@@ -261,6 +281,8 @@ impl<N: NodePrimitives> StaticFileProviderInner<N> {
let provider = Self {
map: Default::default(),
writers: Default::default(),
static_files_min_block: Default::default(),
expired_history_height: Default::default(),
static_files_max_block: Default::default(),
static_files_tx_index: Default::default(),
path: path.as_ref().to_path_buf(),
@@ -422,26 +444,71 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
self.map.remove(&(fixed_block_range_end, segment));
}
/// This handles history expiry by deleting all transaction static files below the given block.
///
/// For example if block is 1M and the blocks per file are 500K this will delete all individual
/// files below 1M, so 0-499K and 500K-999K.
///
/// This will not delete the file that contains the block itself, because files can only be
/// removed entirely.
pub fn delete_transactions_below(&self, block: BlockNumber) -> ProviderResult<()> {
// Nothing to delete if block is 0.
if block == 0 {
return Ok(())
}
loop {
let Some(block_height) =
self.get_lowest_static_file_block(StaticFileSegment::Transactions)
else {
return Ok(())
};
if block_height >= block {
return Ok(())
}
debug!(
target: "provider::static_file",
?block_height,
"Deleting transaction static file below block"
);
// now we need to wipe the static file, this will take care of updating the index and
// advance the lowest tracked block height for the transactions segment.
self.delete_jar(StaticFileSegment::Transactions, block_height)
.inspect_err(|err| {
warn!( target: "provider::static_file", %block_height, ?err, "Failed to delete transaction static file below block")
})
?;
}
}
/// Given a segment and block, it deletes the jar and all files from the respective block range.
///
/// CAUTION: destructive. Deletes files on disk.
///
/// This will re-initialize the index after deletion, so all files are tracked.
pub fn delete_jar(&self, segment: StaticFileSegment, block: BlockNumber) -> ProviderResult<()> {
let fixed_block_range = self.find_fixed_range(block);
let key = (fixed_block_range.end(), segment);
let jar = if let Some((_, jar)) = self.map.remove(&key) {
jar.jar
} else {
NippyJar::<SegmentHeader>::load(&self.path.join(segment.filename(&fixed_block_range)))
.map_err(ProviderError::other)?
let file = self.path.join(segment.filename(&fixed_block_range));
debug!(
target: "provider::static_file",
?file,
?fixed_block_range,
?block,
"Loading static file jar for deletion"
);
NippyJar::<SegmentHeader>::load(&file).map_err(ProviderError::other)?
};
jar.delete().map_err(ProviderError::other)?;
let mut segment_max_block = None;
if fixed_block_range.start() > 0 {
segment_max_block = Some(fixed_block_range.start() - 1)
};
self.update_index(segment, segment_max_block)?;
self.initialize_index()?;
Ok(())
}
@@ -597,16 +664,21 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
/// Initializes the inner transaction and block index
pub fn initialize_index(&self) -> ProviderResult<()> {
let mut min_block = self.static_files_min_block.write();
let mut max_block = self.static_files_max_block.write();
let mut tx_index = self.static_files_tx_index.write();
min_block.clear();
max_block.clear();
tx_index.clear();
for (segment, ranges) in iter_static_files(&self.path).map_err(ProviderError::other)? {
// Update last block for each segment
if let Some((block_range, _)) = ranges.last() {
max_block.insert(segment, block_range.end());
// Update first and last block for each segment
if let Some((first_block_range, _)) = ranges.first() {
min_block.insert(segment, first_block_range.end());
}
if let Some((last_block_range, _)) = ranges.last() {
max_block.insert(segment, last_block_range.end());
}
// Update tx -> block_range index
@@ -629,6 +701,11 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
// If this is a re-initialization, we need to clear this as well
self.map.clear();
// initialize the expired history height to the lowest static file block
if let Some(lowest_block) = min_block.get(&StaticFileSegment::Transactions) {
self.expired_history_height.store(*lowest_block, std::sync::atomic::Ordering::Relaxed);
}
Ok(())
}
@@ -938,7 +1015,33 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
Ok(None)
}
/// Gets the highest static file block if it exists for a static file segment.
/// Returns the highest block number that has been expired from the history (missing).
///
/// The earliest block that is still available in the static files is `expired_history_height +
/// 1`.
pub fn expired_history_height(&self) -> BlockNumber {
self.expired_history_height.load(std::sync::atomic::Ordering::Relaxed)
}
/// Gets the lowest transaction static file block if it exists.
///
/// For example if the transactions static file has blocks 0-499, this will return 499..
///
/// If there is nothing on disk for the given segment, this will return [`None`].
pub fn get_lowest_transaction_static_file_block(&self) -> Option<BlockNumber> {
self.get_lowest_static_file_block(StaticFileSegment::Transactions)
}
/// Gets the lowest static file's block height if it exists for a static file segment.
///
/// For example if the static file has blocks 0-499, this will return 499..
///
/// If there is nothing on disk for the given segment, this will return [`None`].
pub fn get_lowest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
self.static_files_min_block.read().get(&segment).copied()
}
/// Gets the highest static file's block height if it exists for a static file segment.
///
/// If there is nothing on disk for the given segment, this will return [`None`].
pub fn get_highest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {