From 42c24f07d965f977303471ec92d78aa8ecbce96d Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Wed, 27 Nov 2024 15:43:38 +0400 Subject: [PATCH] refactor: unify code paths for receipts removal (#12887) --- crates/stages/stages/src/stages/bodies.rs | 46 +--- crates/stages/stages/src/stages/execution.rs | 218 ++++++++---------- crates/stages/stages/src/stages/utils.rs | 41 +++- .../src/providers/database/provider.rs | 98 ++++++-- crates/storage/provider/src/traits/block.rs | 18 +- crates/storage/provider/src/traits/state.rs | 12 +- crates/storage/provider/src/writer/mod.rs | 23 +- 7 files changed, 250 insertions(+), 206 deletions(-) diff --git a/crates/stages/stages/src/stages/bodies.rs b/crates/stages/stages/src/stages/bodies.rs index c2de929240..c1fde11c23 100644 --- a/crates/stages/stages/src/stages/bodies.rs +++ b/crates/stages/stages/src/stages/bodies.rs @@ -8,15 +8,13 @@ use reth_codecs::Compact; use reth_primitives_traits::{Block, BlockBody}; use tracing::*; -use alloy_primitives::TxNumber; use reth_db::{tables, transaction::DbTx}; use reth_db_api::{cursor::DbCursorRO, transaction::DbTxMut}; use reth_network_p2p::bodies::{downloader::BodyDownloader, response::BlockResponse}; use reth_primitives::StaticFileSegment; use reth_provider::{ - providers::{StaticFileProvider, StaticFileWriter}, - BlockReader, BlockWriter, DBProvider, ProviderError, StaticFileProviderFactory, StatsReader, - StorageLocation, + providers::StaticFileWriter, BlockReader, BlockWriter, DBProvider, ProviderError, + StaticFileProviderFactory, StatsReader, StorageLocation, }; use reth_stages_api::{ EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, @@ -24,6 +22,8 @@ use reth_stages_api::{ }; use reth_storage_errors::provider::ProviderResult; +use super::missing_static_data_error; + /// The body stage downloads block bodies. /// /// The body stage downloads block bodies for all block headers stored locally in storage. @@ -128,6 +128,7 @@ impl BodyStage { next_static_file_tx_num.saturating_sub(1), &static_file_provider, provider, + StaticFileSegment::Transactions, )?) } } else { @@ -135,6 +136,7 @@ impl BodyStage { next_static_file_tx_num.saturating_sub(1), &static_file_provider, provider, + StaticFileSegment::Transactions, )?) } } @@ -242,42 +244,6 @@ where } } -/// Called when database is ahead of static files. Attempts to find the first block we are missing -/// transactions for. -fn missing_static_data_error( - last_tx_num: TxNumber, - static_file_provider: &StaticFileProvider, - provider: &Provider, -) -> Result -where - Provider: BlockReader + StaticFileProviderFactory, -{ - let mut last_block = static_file_provider - .get_highest_static_file_block(StaticFileSegment::Transactions) - .unwrap_or_default(); - - // To be extra safe, we make sure that the last tx num matches the last block from its indices. - // If not, get it. - loop { - if let Some(indices) = provider.block_body_indices(last_block)? { - if indices.last_tx_num() <= last_tx_num { - break - } - } - if last_block == 0 { - break - } - last_block -= 1; - } - - let missing_block = Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default()); - - Ok(StageError::MissingStaticFileData { - block: missing_block, - segment: StaticFileSegment::Transactions, - }) -} - // TODO(alexey): ideally, we want to measure Bodies stage progress in bytes, but it's hard to know // beforehand how many bytes we need to download. So the good solution would be to measure the // progress in gas as a proxy to size. Execution stage uses a similar approach. diff --git a/crates/stages/stages/src/stages/execution.rs b/crates/stages/stages/src/stages/execution.rs index 3c31dea91f..297130c34c 100644 --- a/crates/stages/stages/src/stages/execution.rs +++ b/crates/stages/stages/src/stages/execution.rs @@ -14,7 +14,7 @@ use reth_exex::{ExExManagerHandle, ExExNotification, ExExNotificationSource}; use reth_primitives::{SealedHeader, StaticFileSegment}; use reth_primitives_traits::{format_gas_throughput, Block, BlockBody, NodePrimitives}; use reth_provider::{ - providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter}, + providers::{StaticFileProvider, StaticFileWriter}, BlockHashReader, BlockReader, DBProvider, HeaderProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError, StateChangeWriter, StateCommitmentProvider, StateWriter, StaticFileProviderFactory, StatsReader, StorageLocation, TransactionVariant, @@ -35,6 +35,8 @@ use std::{ }; use tracing::*; +use super::missing_static_data_error; + /// The execution stage executes all transactions and /// update history indexes. /// @@ -169,6 +171,88 @@ impl ExecutionStage { } Ok(prune_modes) } + + /// Performs consistency check on static files. + /// + /// This function compares the highest receipt number recorded in the database with that in the + /// static file to detect any discrepancies due to unexpected shutdowns or database rollbacks. + /// **If the height in the static file is higher**, it rolls back (unwinds) the static file. + /// **Conversely, if the height in the database is lower**, it triggers a rollback in the + /// database (by returning [`StageError`]) until the heights in both the database and static + /// file match. + fn ensure_consistency( + &self, + provider: &Provider, + checkpoint: u64, + unwind_to: Option, + ) -> Result<(), StageError> + where + Provider: StaticFileProviderFactory + DBProvider + BlockReader + HeaderProvider, + { + // If thre's any receipts pruning configured, receipts are written directly to database and + // inconsistencies are expected. + if self.prune_modes.has_receipts_pruning() { + return Ok(()) + } + + // Get next expected receipt number + let tx = provider.tx_ref(); + let next_receipt_num = tx + .cursor_read::()? + .seek_exact(checkpoint)? + .map(|(_, value)| value.next_tx_num()) + .unwrap_or(0); + + let static_file_provider = provider.static_file_provider(); + + // Get next expected receipt number in static files + let next_static_file_receipt_num = static_file_provider + .get_highest_static_file_tx(StaticFileSegment::Receipts) + .map(|num| num + 1) + .unwrap_or(0); + + // Check if we had any unexpected shutdown after committing to static files, but + // NOT committing to database. + match next_static_file_receipt_num.cmp(&next_receipt_num) { + // It can be equal when it's a chain of empty blocks, but we still need to update the + // last block in the range. + Ordering::Greater | Ordering::Equal => { + let mut static_file_producer = + static_file_provider.latest_writer(StaticFileSegment::Receipts)?; + static_file_producer + .prune_receipts(next_static_file_receipt_num - next_receipt_num, checkpoint)?; + // Since this is a database <-> static file inconsistency, we commit the change + // straight away. + static_file_producer.commit()?; + } + Ordering::Less => { + // If we are already in the process of unwind, this might be fine because we will + // fix the inconsistency right away. + if let Some(unwind_to) = unwind_to { + let next_receipt_num_after_unwind = provider + .tx_ref() + .get::(unwind_to)? + .map(|b| b.next_tx_num()) + .ok_or(ProviderError::BlockBodyIndicesNotFound(unwind_to))?; + + if next_receipt_num_after_unwind > next_static_file_receipt_num { + // This means we need a deeper unwind. + } else { + return Ok(()) + } + } + + return Err(missing_static_data_error( + next_static_file_receipt_num.saturating_sub(1), + &static_file_provider, + provider, + StaticFileSegment::Receipts, + )?) + } + } + + Ok(()) + } } impl Stage for ExecutionStage @@ -209,20 +293,7 @@ where let prune_modes = self.adjust_prune_modes(provider, start_block, max_block)?; let static_file_provider = provider.static_file_provider(); - // We only use static files for Receipts, if there is no receipt pruning of any kind. - let write_receipts_to = if self.prune_modes.receipts.is_none() && - self.prune_modes.receipts_log_filter.is_empty() - { - debug!(target: "sync::stages::execution", start = start_block, "Preparing static file producer"); - let mut producer = - prepare_static_file_producer(provider, &static_file_provider, start_block)?; - // Since there might be a database <-> static file inconsistency (read - // `prepare_static_file_producer` for context), we commit the change straight away. - producer.commit()?; - StorageLocation::StaticFiles - } else { - StorageLocation::Database - }; + self.ensure_consistency(provider, input.checkpoint().block_number, None)?; let db = StateProviderDatabase(LatestStateProviderRef::new(provider)); let mut executor = self.executor_provider.batch_executor(db); @@ -361,7 +432,7 @@ where let time = Instant::now(); // write output - provider.write_to_storage(state, OriginalValuesKnown::Yes, write_receipts_to)?; + provider.write_to_storage(state, OriginalValuesKnown::Yes, StorageLocation::StaticFiles)?; let db_write_duration = time.elapsed(); debug!( @@ -408,10 +479,13 @@ where }) } + self.ensure_consistency(provider, input.checkpoint.block_number, Some(unwind_to))?; + // Unwind account and storage changesets, as well as receipts. // // This also updates `PlainStorageState` and `PlainAccountState`. - let bundle_state_with_receipts = provider.take_state_above(unwind_to)?; + let bundle_state_with_receipts = + provider.take_state_above(unwind_to, StorageLocation::Both)?; // Prepare the input for post unwind commit hook, where an `ExExNotification` will be sent. if self.exex_manager_handle.has_exexs() { @@ -432,25 +506,6 @@ where } } - let static_file_provider = provider.static_file_provider(); - - // Unwind all receipts for transactions in the block range - if self.prune_modes.receipts.is_none() && self.prune_modes.receipts_log_filter.is_empty() { - // We only use static files for Receipts, if there is no receipt pruning of any kind. - - // prepare_static_file_producer does a consistency check that will unwind static files - // if the expected highest receipt in the files is higher than the database. - // Which is essentially what happens here when we unwind this stage. - let _static_file_producer = - prepare_static_file_producer(provider, &static_file_provider, *range.start())?; - } else { - // If there is any kind of receipt pruning/filtering we use the database, since static - // files do not support filters. - // - // If we hit this case, the receipts have already been unwound by the call to - // `take_state`. - } - // Update the checkpoint. let mut stage_checkpoint = input.checkpoint.execution_stage_checkpoint(); if let Some(stage_checkpoint) = stage_checkpoint.as_mut() { @@ -576,85 +631,6 @@ fn calculate_gas_used_from_headers( Ok(gas_total) } -/// Returns a `StaticFileProviderRWRefMut` static file producer after performing a consistency -/// check. -/// -/// This function compares the highest receipt number recorded in the database with that in the -/// static file to detect any discrepancies due to unexpected shutdowns or database rollbacks. **If -/// the height in the static file is higher**, it rolls back (unwinds) the static file. -/// **Conversely, if the height in the database is lower**, it triggers a rollback in the database -/// (by returning [`StageError`]) until the heights in both the database and static file match. -fn prepare_static_file_producer<'a, 'b, Provider>( - provider: &'b Provider, - static_file_provider: &'a StaticFileProvider, - start_block: u64, -) -> Result, StageError> -where - Provider: StaticFileProviderFactory + DBProvider + BlockReader + HeaderProvider, - 'b: 'a, -{ - // Get next expected receipt number - let tx = provider.tx_ref(); - let next_receipt_num = tx - .cursor_read::()? - .seek_exact(start_block)? - .map(|(_, value)| value.first_tx_num) - .unwrap_or(0); - - // Get next expected receipt number in static files - let next_static_file_receipt_num = static_file_provider - .get_highest_static_file_tx(StaticFileSegment::Receipts) - .map(|num| num + 1) - .unwrap_or(0); - - let mut static_file_producer = - static_file_provider.get_writer(start_block, StaticFileSegment::Receipts)?; - - // Check if we had any unexpected shutdown after committing to static files, but - // NOT committing to database. - match next_static_file_receipt_num.cmp(&next_receipt_num) { - // It can be equal when it's a chain of empty blocks, but we still need to update the last - // block in the range. - Ordering::Greater | Ordering::Equal => static_file_producer.prune_receipts( - next_static_file_receipt_num - next_receipt_num, - start_block.saturating_sub(1), - )?, - Ordering::Less => { - let mut last_block = static_file_provider - .get_highest_static_file_block(StaticFileSegment::Receipts) - .unwrap_or(0); - - let last_receipt_num = static_file_provider - .get_highest_static_file_tx(StaticFileSegment::Receipts) - .unwrap_or(0); - - // To be extra safe, we make sure that the last receipt num matches the last block from - // its indices. If not, get it. - loop { - if let Some(indices) = provider.block_body_indices(last_block)? { - if indices.last_tx_num() <= last_receipt_num { - break - } - } - if last_block == 0 { - break - } - last_block -= 1; - } - - let missing_block = - Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default()); - - return Err(StageError::MissingStaticFileData { - block: missing_block, - segment: StaticFileSegment::Receipts, - }) - } - } - - Ok(static_file_producer) -} - #[cfg(test)] mod tests { use super::*; @@ -900,7 +876,7 @@ mod tests { // Tests node with database and node with static files for mut mode in modes { - let provider = factory.database_provider_rw().unwrap(); + let mut provider = factory.database_provider_rw().unwrap(); if let Some(mode) = &mut mode { // Simulating a full node where we write receipts to database @@ -909,6 +885,7 @@ mod tests { let mut execution_stage = stage(); execution_stage.prune_modes = mode.clone().unwrap_or_default(); + provider.set_prune_modes(mode.clone().unwrap_or_default()); let output = execution_stage.execute(&provider, input).unwrap(); provider.commit().unwrap(); @@ -973,9 +950,10 @@ mod tests { "Post changed of a account" ); - let provider = factory.database_provider_rw().unwrap(); + let mut provider = factory.database_provider_rw().unwrap(); let mut stage = stage(); - stage.prune_modes = mode.unwrap_or_default(); + stage.prune_modes = mode.clone().unwrap_or_default(); + provider.set_prune_modes(mode.unwrap_or_default()); let _result = stage .unwind( @@ -1050,6 +1028,7 @@ mod tests { // Test Execution let mut execution_stage = stage(); execution_stage.prune_modes = mode.clone().unwrap_or_default(); + provider.set_prune_modes(mode.clone().unwrap_or_default()); let result = execution_stage.execute(&provider, input).unwrap(); provider.commit().unwrap(); @@ -1057,7 +1036,8 @@ mod tests { // Test Unwind provider = factory.database_provider_rw().unwrap(); let mut stage = stage(); - stage.prune_modes = mode.unwrap_or_default(); + stage.prune_modes = mode.clone().unwrap_or_default(); + provider.set_prune_modes(mode.clone().unwrap_or_default()); let result = stage .unwind( diff --git a/crates/stages/stages/src/stages/utils.rs b/crates/stages/stages/src/stages/utils.rs index caf039faca..5aa1f3f880 100644 --- a/crates/stages/stages/src/stages/utils.rs +++ b/crates/stages/stages/src/stages/utils.rs @@ -1,5 +1,5 @@ //! Utils for `stages`. -use alloy_primitives::BlockNumber; +use alloy_primitives::{BlockNumber, TxNumber}; use reth_config::config::EtlConfig; use reth_db::BlockNumberList; use reth_db_api::{ @@ -10,7 +10,11 @@ use reth_db_api::{ DatabaseError, }; use reth_etl::Collector; -use reth_provider::DBProvider; +use reth_primitives::StaticFileSegment; +use reth_provider::{ + providers::StaticFileProvider, BlockReader, DBProvider, ProviderError, + StaticFileProviderFactory, +}; use reth_stages_api::StageError; use std::{collections::HashMap, hash::Hash, ops::RangeBounds}; use tracing::info; @@ -244,3 +248,36 @@ impl LoadMode { matches!(self, Self::Flush) } } + +/// Called when database is ahead of static files. Attempts to find the first block we are missing +/// transactions for. +pub(crate) fn missing_static_data_error( + last_tx_num: TxNumber, + static_file_provider: &StaticFileProvider, + provider: &Provider, + segment: StaticFileSegment, +) -> Result +where + Provider: BlockReader + StaticFileProviderFactory, +{ + let mut last_block = + static_file_provider.get_highest_static_file_block(segment).unwrap_or_default(); + + // To be extra safe, we make sure that the last tx num matches the last block from its indices. + // If not, get it. + loop { + if let Some(indices) = provider.block_body_indices(last_block)? { + if indices.last_tx_num() <= last_tx_num { + break + } + } + if last_block == 0 { + break + } + last_block -= 1; + } + + let missing_block = Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default()); + + Ok(StageError::MissingStaticFileData { block: missing_block, segment }) +} diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 8e4d22067d..33c15280d7 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -206,6 +206,12 @@ impl DatabaseProvider { Ok(Box::new(state_provider)) } + + #[cfg(feature = "test-utils")] + /// Sets the prune modes for provider. + pub fn set_prune_modes(&mut self, prune_modes: PruneModes) { + self.prune_modes = prune_modes; + } } impl NodePrimitivesProvider for DatabaseProvider { @@ -335,6 +341,34 @@ impl DatabaseProvider ProviderResult<()> { + if remove_from.database() { + // iterate over block body and remove receipts + self.remove::(from_tx..)?; + } + + if remove_from.static_files() && !self.prune_modes.has_receipts_pruning() { + let static_file_receipt_num = + self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts); + + let to_delete = static_file_receipt_num + .map(|static_num| (static_num + 1).saturating_sub(from_tx)) + .unwrap_or_default(); + + self.static_file_provider + .latest_writer(StaticFileSegment::Receipts)? + .prune_receipts(to_delete, last_block)?; + } + + Ok(()) + } } impl TryIntoHistoricalStateProvider for DatabaseProvider { @@ -1951,7 +1985,11 @@ impl StateChangeWriter /// 1. Take the old value from the changeset /// 2. Take the new value from the local state /// 3. Set the local state to the value in the changeset - fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> { + fn remove_state_above( + &self, + block: BlockNumber, + remove_receipts_from: StorageLocation, + ) -> ProviderResult<()> { let range = block + 1..=self.last_block_number()?; if range.is_empty() { @@ -1964,8 +2002,6 @@ impl StateChangeWriter // get transaction receipts let from_transaction_num = block_bodies.first().expect("already checked if there are blocks").1.first_tx_num(); - let to_transaction_num = - block_bodies.last().expect("already checked if there are blocks").1.last_tx_num(); let storage_range = BlockNumberAddress::range(range.clone()); @@ -2018,8 +2054,7 @@ impl StateChangeWriter } } - // iterate over block body and remove receipts - self.remove::(from_transaction_num..=to_transaction_num)?; + self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?; Ok(()) } @@ -2045,7 +2080,11 @@ impl StateChangeWriter /// 1. Take the old value from the changeset /// 2. Take the new value from the local state /// 3. Set the local state to the value in the changeset - fn take_state_above(&self, block: BlockNumber) -> ProviderResult { + fn take_state_above( + &self, + block: BlockNumber, + remove_receipts_from: StorageLocation, + ) -> ProviderResult { let range = block + 1..=self.last_block_number()?; if range.is_empty() { @@ -2115,22 +2154,45 @@ impl StateChangeWriter } } - // iterate over block body and create ExecutionResult - let mut receipt_iter = - self.take::(from_transaction_num..=to_transaction_num)?.into_iter(); + // Collect receipts into tuples (tx_num, receipt) to correctly handle pruned receipts + let mut receipts_iter = self + .static_file_provider + .get_range_with_static_file_or_database( + StaticFileSegment::Receipts, + from_transaction_num..to_transaction_num + 1, + |static_file, range, _| { + static_file + .receipts_by_tx_range(range.clone()) + .map(|r| range.into_iter().zip(r).collect()) + }, + |range, _| { + self.tx + .cursor_read::()? + .walk_range(range)? + .map(|r| r.map_err(Into::into)) + .collect() + }, + |_| true, + )? + .into_iter() + .peekable(); let mut receipts = Vec::with_capacity(block_bodies.len()); // loop break if we are at the end of the blocks. for (_, block_body) in block_bodies { let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize); - for _ in block_body.tx_num_range() { - if let Some((_, receipt)) = receipt_iter.next() { - block_receipts.push(Some(receipt)); + for num in block_body.tx_num_range() { + if receipts_iter.peek().is_some_and(|(n, _)| *n == num) { + block_receipts.push(receipts_iter.next().map(|(_, r)| r)); + } else { + block_receipts.push(None); } } receipts.push(block_receipts); } + self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?; + Ok(ExecutionOutcome::new_init( state, reverts, @@ -2594,20 +2656,20 @@ impl BlockExecu fn take_block_and_execution_above( &self, block: BlockNumber, - remove_transactions_from: StorageLocation, + remove_from: StorageLocation, ) -> ProviderResult> { let range = block + 1..=self.last_block_number()?; self.unwind_trie_state_range(range.clone())?; // get execution res - let execution_state = self.take_state_above(block)?; + let execution_state = self.take_state_above(block, remove_from)?; let blocks = self.sealed_block_with_senders_range(range)?; // remove block bodies it is needed for both get block range and get block execution results // that is why it is deleted afterwards. - self.remove_blocks_above(block, remove_transactions_from)?; + self.remove_blocks_above(block, remove_from)?; // Update pipeline progress self.update_pipeline_stages(block, true)?; @@ -2618,18 +2680,18 @@ impl BlockExecu fn remove_block_and_execution_above( &self, block: BlockNumber, - remove_transactions_from: StorageLocation, + remove_from: StorageLocation, ) -> ProviderResult<()> { let range = block + 1..=self.last_block_number()?; self.unwind_trie_state_range(range)?; // remove execution res - self.remove_state_above(block)?; + self.remove_state_above(block, remove_from)?; // remove block bodies it is needed for both get block range and get block execution results // that is why it is deleted afterwards. - self.remove_blocks_above(block, remove_transactions_from)?; + self.remove_blocks_above(block, remove_from)?; // Update pipeline progress self.update_pipeline_stages(block, true)?; diff --git a/crates/storage/provider/src/traits/block.rs b/crates/storage/provider/src/traits/block.rs index be4042fe28..6d7e576124 100644 --- a/crates/storage/provider/src/traits/block.rs +++ b/crates/storage/provider/src/traits/block.rs @@ -37,19 +37,25 @@ pub trait BlockExecutionWriter: /// Take all of the blocks above the provided number and their execution result /// /// The passed block number will stay in the database. + /// + /// Accepts [`StorageLocation`] specifying from where should transactions and receipts be + /// removed. fn take_block_and_execution_above( &self, block: BlockNumber, - remove_transactions_from: StorageLocation, + remove_from: StorageLocation, ) -> ProviderResult>; /// Remove all of the blocks above the provided number and their execution result /// /// The passed block number will stay in the database. + /// + /// Accepts [`StorageLocation`] specifying from where should transactions and receipts be + /// removed. fn remove_block_and_execution_above( &self, block: BlockNumber, - remove_transactions_from: StorageLocation, + remove_from: StorageLocation, ) -> ProviderResult<()>; } @@ -57,17 +63,17 @@ impl BlockExecutionWriter for &T { fn take_block_and_execution_above( &self, block: BlockNumber, - remove_transactions_from: StorageLocation, + remove_from: StorageLocation, ) -> ProviderResult> { - (*self).take_block_and_execution_above(block, remove_transactions_from) + (*self).take_block_and_execution_above(block, remove_from) } fn remove_block_and_execution_above( &self, block: BlockNumber, - remove_transactions_from: StorageLocation, + remove_from: StorageLocation, ) -> ProviderResult<()> { - (*self).remove_block_and_execution_above(block, remove_transactions_from) + (*self).remove_block_and_execution_above(block, remove_from) } } diff --git a/crates/storage/provider/src/traits/state.rs b/crates/storage/provider/src/traits/state.rs index 057d3a19a7..2e46e28507 100644 --- a/crates/storage/provider/src/traits/state.rs +++ b/crates/storage/provider/src/traits/state.rs @@ -40,9 +40,17 @@ pub trait StateChangeWriter { /// Remove the block range of state above the given block. The state of the passed block is not /// removed. - fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()>; + fn remove_state_above( + &self, + block: BlockNumber, + remove_receipts_from: StorageLocation, + ) -> ProviderResult<()>; /// Take the block range of state, recreating the [`ExecutionOutcome`]. The state of the passed /// block is not removed. - fn take_state_above(&self, block: BlockNumber) -> ProviderResult; + fn take_state_above( + &self, + block: BlockNumber, + remove_receipts_from: StorageLocation, + ) -> ProviderResult; } diff --git a/crates/storage/provider/src/writer/mod.rs b/crates/storage/provider/src/writer/mod.rs index d4d5116de9..c0eeb64b8a 100644 --- a/crates/storage/provider/src/writer/mod.rs +++ b/crates/storage/provider/src/writer/mod.rs @@ -189,25 +189,16 @@ where /// database and static files. This is exclusive, i.e., it only removes blocks above /// `block_number`, and does not remove `block_number`. pub fn remove_blocks_above(&self, block_number: u64) -> ProviderResult<()> { + // IMPORTANT: we use `block_number+1` to make sure we remove only what is ABOVE the block + debug!(target: "provider::storage_writer", ?block_number, "Removing blocks from database above block_number"); + self.database().remove_block_and_execution_above(block_number, StorageLocation::Both)?; + // Get highest static file block for the total block range let highest_static_file_block = self .static_file() .get_highest_static_file_block(StaticFileSegment::Headers) .expect("todo: error handling, headers should exist"); - // Get the total txs for the block range, so we have the correct number of columns for - // receipts and transactions - // IMPORTANT: we use `block_number+1` to make sure we remove only what is ABOVE the block - let tx_range = self - .database() - .transaction_range_by_block_range(block_number + 1..=highest_static_file_block)?; - // We are using end + 1 - start here because the returned range is inclusive. - let total_txs = (tx_range.end() + 1).saturating_sub(*tx_range.start()); - - // IMPORTANT: we use `block_number+1` to make sure we remove only what is ABOVE the block - debug!(target: "provider::storage_writer", ?block_number, "Removing blocks from database above block_number"); - self.database().remove_block_and_execution_above(block_number, StorageLocation::Both)?; - // IMPORTANT: we use `highest_static_file_block.saturating_sub(block_number)` to make sure // we remove only what is ABOVE the block. // @@ -218,12 +209,6 @@ where .get_writer(block_number, StaticFileSegment::Headers)? .prune_headers(highest_static_file_block.saturating_sub(block_number))?; - if !self.database().prune_modes_ref().has_receipts_pruning() { - self.static_file() - .get_writer(block_number, StaticFileSegment::Receipts)? - .prune_receipts(total_txs, block_number)?; - } - Ok(()) } }