feat: combine block writing in persistence task (#9960)

Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
Co-authored-by: Federico Gimenez <fgimenez@users.noreply.github.com>
This commit is contained in:
Dan Cline
2024-08-01 05:59:35 -04:00
committed by GitHub
parent f52dc06938
commit 93ab09821b

View File

@@ -3,11 +3,12 @@
use reth_chain_state::ExecutedBlock;
use reth_db::{models::CompactU256, tables, transaction::DbTxMut, Database};
use reth_errors::ProviderResult;
use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256, U256};
use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256};
use reth_provider::{
writer::StorageWriter, BlockExecutionWriter, BlockNumReader, BlockWriter, DatabaseProviderRW,
HistoryWriter, OriginalValuesKnown, ProviderFactory, StageCheckpointWriter, StateChangeWriter,
StateWriter, StaticFileProviderFactory, StaticFileWriter, TransactionsProviderExt, TrieWriter,
providers::StaticFileProvider, writer::StorageWriter, BlockExecutionWriter, BlockNumReader,
BlockWriter, DatabaseProviderRW, HistoryWriter, OriginalValuesKnown, ProviderFactory,
StageCheckpointWriter, StateChangeWriter, StateWriter, StaticFileProviderFactory,
StaticFileWriter, TransactionsProviderExt, TrieWriter,
};
use reth_prune::{Pruner, PrunerOutput};
use reth_stages_types::{StageCheckpoint, StageId};
@@ -45,22 +46,110 @@ impl<DB: Database> PersistenceService<DB> {
Self { provider, incoming, pruner }
}
/// Removes block data above the given block number from the database.
/// This is exclusive, i.e., it only removes blocks above `block_number`, and does not remove
/// `block_number`.
///
/// This will then send a command to the static file service, to remove the actual block data.
fn remove_blocks_above(&self, block_number: u64) -> ProviderResult<()> {
debug!(target: "tree::persistence", ?block_number, "Removing blocks from database above block_number");
let provider_rw = self.provider.provider_rw()?;
let highest_block = self.provider.last_block_number()?;
provider_rw.remove_block_and_execution_range(block_number..=highest_block)?;
provider_rw.commit()?;
Ok(())
}
/// Prunes block data before the given block hash according to the configured prune
/// configuration.
fn prune_before(&mut self, block_num: u64) -> PrunerOutput {
debug!(target: "tree::persistence", ?block_num, "Running pruner");
// TODO: doing this properly depends on pruner segment changes
self.pruner.run(block_num).expect("todo: handle errors")
}
/// Writes the transactions to static files.
///
/// Returns the block number and new total difficulty.
#[instrument(level = "trace", skip_all, fields(block = ?block.num_hash()) target = "engine")]
fn write_transactions(
&self,
block: Arc<SealedBlock>,
provider_rw: &DatabaseProviderRW<DB>,
) -> ProviderResult<()> {
debug!(target: "tree::persistence", "Writing transactions");
let provider = self.provider.static_file_provider();
let td = {
let header_writer = provider.get_writer(block.number, StaticFileSegment::Headers)?;
let mut storage_writer = StorageWriter::new(Some(provider_rw), Some(header_writer));
let td = storage_writer.append_headers_from_blocks(
block.header().number,
std::iter::once(&(block.header(), block.hash())),
)?;
let transactions_writer =
provider.get_writer(block.number, StaticFileSegment::Transactions)?;
let mut storage_writer =
StorageWriter::new(Some(provider_rw), Some(transactions_writer));
let no_hash_transactions =
block.body.clone().into_iter().map(TransactionSignedNoHash::from).collect();
storage_writer.append_transactions_from_blocks(
block.header().number,
std::iter::once(&no_hash_transactions),
)?;
td
};
debug!(target: "tree::persistence", block_num=block.number, "Updating transaction metadata after writing");
provider_rw
.tx_ref()
.put::<tables::HeaderTerminalDifficulties>(block.number, CompactU256(td))?;
provider_rw.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block.number))?;
provider_rw.save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block.number))?;
Ok(())
}
/// Writes the cloned tree state to database
fn write(
fn save_blocks(
&self,
blocks: &[ExecutedBlock],
provider_rw: &DatabaseProviderRW<DB>,
static_file_provider: &StaticFileProvider,
) -> ProviderResult<()> {
if blocks.is_empty() {
debug!(target: "tree::persistence", "Attempted to write empty block range");
return Ok(())
}
debug!(target: "tree::persistence", block_count = %blocks.len(), "Writing blocks to database");
let first_number = blocks.first().unwrap().block().number;
// NOTE: checked non-empty above
let first_block = blocks.first().unwrap().block();
let last_block = blocks.last().unwrap().block().clone();
let last = blocks.last().unwrap().block();
let last_block_number = last.number;
// use the storage writer to write receipts
let current_block = first_block.number;
debug!(target: "tree::persistence", len=blocks.len(), ?current_block, "Writing execution data to static files");
let receipts_writer =
static_file_provider.get_writer(first_block.number, StaticFileSegment::Receipts)?;
{
let mut storage_writer = StorageWriter::new(Some(provider_rw), Some(receipts_writer));
let receipts_iter = blocks.iter().map(|block| {
let receipts = block.execution_outcome().receipts().receipt_vec.clone();
debug_assert!(receipts.len() == 1);
receipts.first().unwrap().clone()
});
storage_writer.append_receipts_from_blocks(current_block, receipts_iter)?;
}
debug!(target: "tree::persistence", block_count = %blocks.len(), "Writing blocks to database");
let first_number = first_block.number;
let last_block_number = last_block.number;
// TODO: remove all the clones and do performant / batched writes for each type of object
// instead of a loop over all blocks,
@@ -75,6 +164,7 @@ impl<DB: Database> PersistenceService<DB> {
let sealed_block =
block.block().clone().try_with_senders_unchecked(block.senders().clone()).unwrap();
provider_rw.insert_block(sealed_block)?;
self.write_transactions(block.block.clone(), provider_rw)?;
// Write state and changesets to the database.
// Must be written after blocks because of the receipt lookup.
@@ -103,121 +193,6 @@ impl<DB: Database> PersistenceService<DB> {
Ok(())
}
/// Removes block data above the given block number from the database.
/// This is exclusive, i.e., it only removes blocks above `block_number`, and does not remove
/// `block_number`.
///
/// This will then send a command to the static file service, to remove the actual block data.
fn remove_blocks_above(&self, block_number: u64) -> ProviderResult<()> {
debug!(target: "tree::persistence", ?block_number, "Removing blocks from database above block_number");
let provider_rw = self.provider.provider_rw()?;
let highest_block = self.provider.last_block_number()?;
provider_rw.remove_block_and_execution_range(block_number..=highest_block)?;
provider_rw.commit()?;
Ok(())
}
/// Prunes block data before the given block hash according to the configured prune
/// configuration.
fn prune_before(&mut self, block_num: u64) -> PrunerOutput {
debug!(target: "tree::persistence", ?block_num, "Running pruner");
// TODO: doing this properly depends on pruner segment changes
self.pruner.run(block_num).expect("todo: handle errors")
}
/// Updates checkpoints related to block headers and bodies. This should be called after new
/// transactions have been successfully written to disk.
fn update_transaction_meta(
&self,
block_num: u64,
td: U256,
provider_rw: &DatabaseProviderRW<DB>,
) -> ProviderResult<()> {
debug!(target: "tree::persistence", ?block_num, "Updating transaction metadata after writing");
provider_rw
.tx_ref()
.put::<tables::HeaderTerminalDifficulties>(block_num, CompactU256(td))?;
provider_rw.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block_num))?;
provider_rw.save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block_num))?;
Ok(())
}
/// Writes the transactions to static files.
///
/// Returns the block number and new total difficulty.
///
/// The [`update_transaction_meta`](Self::update_transaction_meta) method should be called
/// after this, to update the checkpoints for headers and block bodies.
#[instrument(level = "trace", skip_all, fields(block = ?block.num_hash()) target = "engine")]
fn write_transactions(
&self,
block: Arc<SealedBlock>,
provider_rw: &DatabaseProviderRW<DB>,
) -> ProviderResult<(u64, U256)> {
debug!(target: "tree::persistence", "Writing transactions");
let provider = self.provider.static_file_provider();
let new_td = {
let header_writer = provider.get_writer(block.number, StaticFileSegment::Headers)?;
let mut storage_writer = StorageWriter::new(Some(provider_rw), Some(header_writer));
let new_td = storage_writer.append_headers_from_blocks(
block.header().number,
std::iter::once(&(block.header(), block.hash())),
)?;
let transactions_writer =
provider.get_writer(block.number, StaticFileSegment::Transactions)?;
let mut storage_writer =
StorageWriter::new(Some(provider_rw), Some(transactions_writer));
let no_hash_transactions =
block.body.clone().into_iter().map(TransactionSignedNoHash::from).collect();
storage_writer.append_transactions_from_blocks(
block.header().number,
std::iter::once(&no_hash_transactions),
)?;
new_td
};
Ok((block.number, new_td))
}
/// Write execution-related block data to database and/or static files.
fn write_execution_data(
&self,
blocks: &[ExecutedBlock],
provider_rw: &DatabaseProviderRW<DB>,
) -> ProviderResult<()> {
if blocks.is_empty() {
return Ok(())
}
let provider = self.provider.static_file_provider();
// NOTE: checked non-empty above
let first_block = blocks.first().unwrap().block();
let last_block = blocks.last().unwrap().block().clone();
// use the storage writer
let current_block = first_block.number;
debug!(target: "tree::persistence", len=blocks.len(), ?current_block, "Writing execution data to static files");
let receipts_writer =
provider.get_writer(first_block.number, StaticFileSegment::Receipts)?;
{
let mut storage_writer = StorageWriter::new(Some(provider_rw), Some(receipts_writer));
let receipts_iter = blocks.iter().map(|block| {
let receipts = block.execution_outcome().receipts().receipt_vec.clone();
debug_assert!(receipts.len() == 1);
receipts.first().unwrap().clone()
});
storage_writer.append_receipts_from_blocks(current_block, receipts_iter)?;
}
Ok(())
}
/// Removes the blocks above the given block number from static files. Also removes related
/// receipt and header data.
///
@@ -289,19 +264,11 @@ where
let last_block_hash = blocks.last().unwrap().block().hash();
let provider_rw = self.provider.provider_rw().expect("todo: handle errors");
self.write_execution_data(&blocks, &provider_rw).expect("todo: handle errors");
self.write(&blocks, &provider_rw).expect("todo: handle errors");
let static_file_provider = self.provider.static_file_provider();
self.save_blocks(&blocks, &provider_rw, &static_file_provider)
.expect("todo: handle errors");
for block in &blocks {
// first write transactions
let (block_num, td) = self
.write_transactions(block.block.clone(), &provider_rw)
.expect("todo: handle errors");
self.update_transaction_meta(block_num, td, &provider_rw)
.expect("todo: handle errors");
}
self.provider.static_file_provider().commit().expect("todo: handle errors");
static_file_provider.commit().expect("todo: handle errors");
provider_rw.commit().expect("todo: handle errors");
// we ignore the error because the caller may or may not care about the result