chore: move remove_blocks_above to StorageWriter (#9997)

This commit is contained in:
joshieDo
2024-08-01 18:06:44 +01:00
committed by GitHub
parent 7fa16c9155
commit 333dbdeda2
2 changed files with 55 additions and 60 deletions

View File

@@ -2,13 +2,8 @@
use reth_chain_state::ExecutedBlock;
use reth_db::Database;
use reth_errors::ProviderResult;
use reth_primitives::{SealedBlock, StaticFileSegment, B256};
use reth_provider::{
providers::StaticFileProvider, writer::StorageWriter, BlockExecutionWriter, BlockNumReader,
DatabaseProviderRW, ProviderFactory, StaticFileProviderFactory, StaticFileWriter,
TransactionsProviderExt,
};
use reth_primitives::{SealedBlock, B256};
use reth_provider::{writer::StorageWriter, ProviderFactory, StaticFileProviderFactory};
use reth_prune::{Pruner, PrunerOutput};
use std::sync::{
mpsc::{Receiver, SendError, Sender},
@@ -44,48 +39,6 @@ impl<DB: Database> PersistenceService<DB> {
Self { provider, incoming, pruner }
}
/// Removes all block, transaction and receipt data above the given block number from the
/// database and static files. This is exclusive, i.e., it only removes blocks above
/// `block_number`, and does not remove `block_number`.
fn remove_blocks_above(
&self,
block_number: u64,
provider_rw: &DatabaseProviderRW<DB>,
sf_provider: &StaticFileProvider,
) -> ProviderResult<()> {
// Get highest static file block for the total block range
let highest_static_file_block = sf_provider
.get_highest_static_file_block(StaticFileSegment::Headers)
.expect("todo: error handling, headers should exist");
// Get the total txs for the block range, so we have the correct number of columns for
// receipts and transactions
let tx_range = provider_rw
.transaction_range_by_block_range(block_number..=highest_static_file_block)?;
let total_txs = tx_range.end().saturating_sub(*tx_range.start());
debug!(target: "tree::persistence", ?block_number, "Removing blocks from database above block_number");
provider_rw
.remove_block_and_execution_range(block_number..=provider_rw.last_block_number()?)?;
debug!(target: "tree::persistence", ?block_number, "Removing static file blocks above block_number");
sf_provider
.get_writer(block_number, StaticFileSegment::Headers)?
.prune_headers(highest_static_file_block.saturating_sub(block_number))?;
sf_provider
.get_writer(block_number, StaticFileSegment::Transactions)?
.prune_transactions(total_txs, block_number)?;
if !provider_rw.prune_modes_ref().has_receipts_pruning() {
sf_provider
.get_writer(block_number, StaticFileSegment::Receipts)?
.prune_receipts(total_txs, block_number)?;
}
Ok(())
}
/// Prunes block data before the given block hash according to the configured prune
/// configuration.
fn prune_before(&mut self, block_num: u64) -> PrunerOutput {
@@ -109,11 +62,11 @@ where
let provider_rw = self.provider.provider_rw().expect("todo: handle errors");
let sf_provider = self.provider.static_file_provider();
self.remove_blocks_above(new_tip_num, &provider_rw, &sf_provider)
StorageWriter::from(&provider_rw, &sf_provider)
.remove_blocks_above(new_tip_num)
.expect("todo: handle errors");
StorageWriter::commit_unwind(provider_rw, sf_provider)
.expect("todo: handle errors");
provider_rw.commit().expect("todo: handle errors");
sf_provider.commit().expect("todo: handle errors");
// we ignore the error because the caller may or may not care about the result
let _ = sender.send(());

View File

@@ -1,8 +1,8 @@
use crate::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter as SfWriter},
writer::static_file::StaticFileWriter,
BlockWriter, DatabaseProvider, DatabaseProviderRW, HistoryWriter, StateChangeWriter,
StateWriter, TrieWriter,
BlockExecutionWriter, BlockWriter, DatabaseProvider, DatabaseProviderRW, HistoryWriter,
StateChangeWriter, StateWriter, TrieWriter,
};
use reth_chain_state::ExecutedBlock;
use reth_db::{
@@ -18,7 +18,9 @@ use reth_primitives::{
BlockNumber, Header, SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256, U256,
};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_storage_api::{HeaderProvider, ReceiptWriter, StageCheckpointWriter};
use reth_storage_api::{
BlockNumReader, HeaderProvider, ReceiptWriter, StageCheckpointWriter, TransactionsProviderExt,
};
use reth_storage_errors::writer::StorageWriterError;
use revm::db::OriginalValuesKnown;
use std::{borrow::Borrow, sync::Arc};
@@ -173,7 +175,7 @@ where
)
};
debug!(target: "tree::persistence", block_count = %blocks.len(), "Writing blocks and execution data to storage");
debug!(target: "provider::storage_writer", block_count = %blocks.len(), "Writing blocks and execution data to storage");
// TODO: remove all the clones and do performant / batched writes for each type of object
// instead of a loop over all blocks,
@@ -210,7 +212,7 @@ where
// Update pipeline progress
self.database().update_pipeline_stages(last_block_number, false)?;
debug!(target: "tree::persistence", range = ?first_number..=last_block_number, "Appended block data");
debug!(target: "provider::storage_writer", range = ?first_number..=last_block_number, "Appended block data");
Ok(())
}
@@ -219,7 +221,7 @@ where
/// on database.
#[instrument(level = "trace", skip_all, fields(block = ?block.num_hash()) target = "storage")]
fn save_header_and_transactions(&self, block: Arc<SealedBlock>) -> ProviderResult<()> {
debug!(target: "tree::persistence", "Writing headers and transactions.");
debug!(target: "provider::storage_writer", "Writing headers and transactions.");
{
let header_writer =
@@ -230,7 +232,7 @@ where
std::iter::once(&(block.header(), block.hash())),
)?;
debug!(target: "tree::persistence", block_num=block.number, "Updating transaction metadata after writing");
debug!(target: "provider::storage_writer", block_num=block.number, "Updating transaction metadata after writing");
self.database()
.tx_ref()
.put::<tables::HeaderTerminalDifficulties>(block.number, CompactU256(td))?;
@@ -255,6 +257,46 @@ where
Ok(())
}
/// Removes all block, transaction and receipt data above the given block number from the
/// database and static files. This is exclusive, i.e., it only removes blocks above
/// `block_number`, and does not remove `block_number`.
pub fn remove_blocks_above(&self, block_number: u64) -> ProviderResult<()> {
// Get highest static file block for the total block range
let highest_static_file_block = self
.static_file()
.get_highest_static_file_block(StaticFileSegment::Headers)
.expect("todo: error handling, headers should exist");
// Get the total txs for the block range, so we have the correct number of columns for
// receipts and transactions
let tx_range = self
.database()
.transaction_range_by_block_range(block_number..=highest_static_file_block)?;
let total_txs = tx_range.end().saturating_sub(*tx_range.start());
debug!(target: "provider::storage_writer", ?block_number, "Removing blocks from database above block_number");
self.database().remove_block_and_execution_range(
block_number..=self.database().last_block_number()?,
)?;
debug!(target: "provider::storage_writer", ?block_number, "Removing static file blocks above block_number");
self.static_file()
.get_writer(block_number, StaticFileSegment::Headers)?
.prune_headers(highest_static_file_block.saturating_sub(block_number))?;
self.static_file()
.get_writer(block_number, StaticFileSegment::Transactions)?
.prune_transactions(total_txs, block_number)?;
if !self.database().prune_modes_ref().has_receipts_pruning() {
self.static_file()
.get_writer(block_number, StaticFileSegment::Receipts)?
.prune_receipts(total_txs, block_number)?;
}
Ok(())
}
}
impl<'a, 'b, TX> StorageWriter<'a, TX, StaticFileProviderRWRefMut<'b>>