diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index fc7f4818c6..7d396c3dfe 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -2,13 +2,8 @@ use reth_chain_state::ExecutedBlock; use reth_db::Database; -use reth_errors::ProviderResult; -use reth_primitives::{SealedBlock, StaticFileSegment, B256}; -use reth_provider::{ - providers::StaticFileProvider, writer::StorageWriter, BlockExecutionWriter, BlockNumReader, - DatabaseProviderRW, ProviderFactory, StaticFileProviderFactory, StaticFileWriter, - TransactionsProviderExt, -}; +use reth_primitives::{SealedBlock, B256}; +use reth_provider::{writer::StorageWriter, ProviderFactory, StaticFileProviderFactory}; use reth_prune::{Pruner, PrunerOutput}; use std::sync::{ mpsc::{Receiver, SendError, Sender}, @@ -44,48 +39,6 @@ impl PersistenceService { Self { provider, incoming, pruner } } - /// Removes all block, transaction and receipt data above the given block number from the - /// database and static files. This is exclusive, i.e., it only removes blocks above - /// `block_number`, and does not remove `block_number`. - fn remove_blocks_above( - &self, - block_number: u64, - provider_rw: &DatabaseProviderRW, - sf_provider: &StaticFileProvider, - ) -> ProviderResult<()> { - // Get highest static file block for the total block range - let highest_static_file_block = sf_provider - .get_highest_static_file_block(StaticFileSegment::Headers) - .expect("todo: error handling, headers should exist"); - - // Get the total txs for the block range, so we have the correct number of columns for - // receipts and transactions - let tx_range = provider_rw - .transaction_range_by_block_range(block_number..=highest_static_file_block)?; - let total_txs = tx_range.end().saturating_sub(*tx_range.start()); - - debug!(target: "tree::persistence", ?block_number, "Removing blocks from database above block_number"); - provider_rw - .remove_block_and_execution_range(block_number..=provider_rw.last_block_number()?)?; - - debug!(target: "tree::persistence", ?block_number, "Removing static file blocks above block_number"); - sf_provider - .get_writer(block_number, StaticFileSegment::Headers)? - .prune_headers(highest_static_file_block.saturating_sub(block_number))?; - - sf_provider - .get_writer(block_number, StaticFileSegment::Transactions)? - .prune_transactions(total_txs, block_number)?; - - if !provider_rw.prune_modes_ref().has_receipts_pruning() { - sf_provider - .get_writer(block_number, StaticFileSegment::Receipts)? - .prune_receipts(total_txs, block_number)?; - } - - Ok(()) - } - /// Prunes block data before the given block hash according to the configured prune /// configuration. fn prune_before(&mut self, block_num: u64) -> PrunerOutput { @@ -109,11 +62,11 @@ where let provider_rw = self.provider.provider_rw().expect("todo: handle errors"); let sf_provider = self.provider.static_file_provider(); - self.remove_blocks_above(new_tip_num, &provider_rw, &sf_provider) + StorageWriter::from(&provider_rw, &sf_provider) + .remove_blocks_above(new_tip_num) + .expect("todo: handle errors"); + StorageWriter::commit_unwind(provider_rw, sf_provider) .expect("todo: handle errors"); - - provider_rw.commit().expect("todo: handle errors"); - sf_provider.commit().expect("todo: handle errors"); // we ignore the error because the caller may or may not care about the result let _ = sender.send(()); diff --git a/crates/storage/provider/src/writer/mod.rs b/crates/storage/provider/src/writer/mod.rs index b43eb30ce6..00776a86d9 100644 --- a/crates/storage/provider/src/writer/mod.rs +++ b/crates/storage/provider/src/writer/mod.rs @@ -1,8 +1,8 @@ use crate::{ providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter as SfWriter}, writer::static_file::StaticFileWriter, - BlockWriter, DatabaseProvider, DatabaseProviderRW, HistoryWriter, StateChangeWriter, - StateWriter, TrieWriter, + BlockExecutionWriter, BlockWriter, DatabaseProvider, DatabaseProviderRW, HistoryWriter, + StateChangeWriter, StateWriter, TrieWriter, }; use reth_chain_state::ExecutedBlock; use reth_db::{ @@ -18,7 +18,9 @@ use reth_primitives::{ BlockNumber, Header, SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256, U256, }; use reth_stages_types::{StageCheckpoint, StageId}; -use reth_storage_api::{HeaderProvider, ReceiptWriter, StageCheckpointWriter}; +use reth_storage_api::{ + BlockNumReader, HeaderProvider, ReceiptWriter, StageCheckpointWriter, TransactionsProviderExt, +}; use reth_storage_errors::writer::StorageWriterError; use revm::db::OriginalValuesKnown; use std::{borrow::Borrow, sync::Arc}; @@ -173,7 +175,7 @@ where ) }; - debug!(target: "tree::persistence", block_count = %blocks.len(), "Writing blocks and execution data to storage"); + debug!(target: "provider::storage_writer", block_count = %blocks.len(), "Writing blocks and execution data to storage"); // TODO: remove all the clones and do performant / batched writes for each type of object // instead of a loop over all blocks, @@ -210,7 +212,7 @@ where // Update pipeline progress self.database().update_pipeline_stages(last_block_number, false)?; - debug!(target: "tree::persistence", range = ?first_number..=last_block_number, "Appended block data"); + debug!(target: "provider::storage_writer", range = ?first_number..=last_block_number, "Appended block data"); Ok(()) } @@ -219,7 +221,7 @@ where /// on database. #[instrument(level = "trace", skip_all, fields(block = ?block.num_hash()) target = "storage")] fn save_header_and_transactions(&self, block: Arc) -> ProviderResult<()> { - debug!(target: "tree::persistence", "Writing headers and transactions."); + debug!(target: "provider::storage_writer", "Writing headers and transactions."); { let header_writer = @@ -230,7 +232,7 @@ where std::iter::once(&(block.header(), block.hash())), )?; - debug!(target: "tree::persistence", block_num=block.number, "Updating transaction metadata after writing"); + debug!(target: "provider::storage_writer", block_num=block.number, "Updating transaction metadata after writing"); self.database() .tx_ref() .put::(block.number, CompactU256(td))?; @@ -255,6 +257,46 @@ where Ok(()) } + + /// Removes all block, transaction and receipt data above the given block number from the + /// database and static files. This is exclusive, i.e., it only removes blocks above + /// `block_number`, and does not remove `block_number`. + pub fn remove_blocks_above(&self, block_number: u64) -> ProviderResult<()> { + // Get highest static file block for the total block range + let highest_static_file_block = self + .static_file() + .get_highest_static_file_block(StaticFileSegment::Headers) + .expect("todo: error handling, headers should exist"); + + // Get the total txs for the block range, so we have the correct number of columns for + // receipts and transactions + let tx_range = self + .database() + .transaction_range_by_block_range(block_number..=highest_static_file_block)?; + let total_txs = tx_range.end().saturating_sub(*tx_range.start()); + + debug!(target: "provider::storage_writer", ?block_number, "Removing blocks from database above block_number"); + self.database().remove_block_and_execution_range( + block_number..=self.database().last_block_number()?, + )?; + + debug!(target: "provider::storage_writer", ?block_number, "Removing static file blocks above block_number"); + self.static_file() + .get_writer(block_number, StaticFileSegment::Headers)? + .prune_headers(highest_static_file_block.saturating_sub(block_number))?; + + self.static_file() + .get_writer(block_number, StaticFileSegment::Transactions)? + .prune_transactions(total_txs, block_number)?; + + if !self.database().prune_modes_ref().has_receipts_pruning() { + self.static_file() + .get_writer(block_number, StaticFileSegment::Receipts)? + .prune_receipts(total_txs, block_number)?; + } + + Ok(()) + } } impl<'a, 'b, TX> StorageWriter<'a, TX, StaticFileProviderRWRefMut<'b>>