diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index 99cbab79b9..316e9330b8 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -3,11 +3,12 @@ use reth_chain_state::ExecutedBlock; use reth_db::{models::CompactU256, tables, transaction::DbTxMut, Database}; use reth_errors::ProviderResult; -use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256, U256}; +use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256}; use reth_provider::{ - writer::StorageWriter, BlockExecutionWriter, BlockNumReader, BlockWriter, DatabaseProviderRW, - HistoryWriter, OriginalValuesKnown, ProviderFactory, StageCheckpointWriter, StateChangeWriter, - StateWriter, StaticFileProviderFactory, StaticFileWriter, TransactionsProviderExt, TrieWriter, + providers::StaticFileProvider, writer::StorageWriter, BlockExecutionWriter, BlockNumReader, + BlockWriter, DatabaseProviderRW, HistoryWriter, OriginalValuesKnown, ProviderFactory, + StageCheckpointWriter, StateChangeWriter, StateWriter, StaticFileProviderFactory, + StaticFileWriter, TransactionsProviderExt, TrieWriter, }; use reth_prune::{Pruner, PrunerOutput}; use reth_stages_types::{StageCheckpoint, StageId}; @@ -45,22 +46,110 @@ impl PersistenceService { Self { provider, incoming, pruner } } + /// Removes block data above the given block number from the database. + /// This is exclusive, i.e., it only removes blocks above `block_number`, and does not remove + /// `block_number`. + /// + /// This will then send a command to the static file service, to remove the actual block data. + fn remove_blocks_above(&self, block_number: u64) -> ProviderResult<()> { + debug!(target: "tree::persistence", ?block_number, "Removing blocks from database above block_number"); + let provider_rw = self.provider.provider_rw()?; + let highest_block = self.provider.last_block_number()?; + provider_rw.remove_block_and_execution_range(block_number..=highest_block)?; + provider_rw.commit()?; + + Ok(()) + } + + /// Prunes block data before the given block hash according to the configured prune + /// configuration. + fn prune_before(&mut self, block_num: u64) -> PrunerOutput { + debug!(target: "tree::persistence", ?block_num, "Running pruner"); + // TODO: doing this properly depends on pruner segment changes + self.pruner.run(block_num).expect("todo: handle errors") + } + + /// Writes the transactions to static files. + /// + /// Returns the block number and new total difficulty. + #[instrument(level = "trace", skip_all, fields(block = ?block.num_hash()) target = "engine")] + fn write_transactions( + &self, + block: Arc, + provider_rw: &DatabaseProviderRW, + ) -> ProviderResult<()> { + debug!(target: "tree::persistence", "Writing transactions"); + let provider = self.provider.static_file_provider(); + + let td = { + let header_writer = provider.get_writer(block.number, StaticFileSegment::Headers)?; + let mut storage_writer = StorageWriter::new(Some(provider_rw), Some(header_writer)); + let td = storage_writer.append_headers_from_blocks( + block.header().number, + std::iter::once(&(block.header(), block.hash())), + )?; + + let transactions_writer = + provider.get_writer(block.number, StaticFileSegment::Transactions)?; + let mut storage_writer = + StorageWriter::new(Some(provider_rw), Some(transactions_writer)); + let no_hash_transactions = + block.body.clone().into_iter().map(TransactionSignedNoHash::from).collect(); + storage_writer.append_transactions_from_blocks( + block.header().number, + std::iter::once(&no_hash_transactions), + )?; + + td + }; + + debug!(target: "tree::persistence", block_num=block.number, "Updating transaction metadata after writing"); + provider_rw + .tx_ref() + .put::(block.number, CompactU256(td))?; + provider_rw.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block.number))?; + provider_rw.save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block.number))?; + + Ok(()) + } + /// Writes the cloned tree state to database - fn write( + fn save_blocks( &self, blocks: &[ExecutedBlock], provider_rw: &DatabaseProviderRW, + static_file_provider: &StaticFileProvider, ) -> ProviderResult<()> { if blocks.is_empty() { debug!(target: "tree::persistence", "Attempted to write empty block range"); return Ok(()) } - debug!(target: "tree::persistence", block_count = %blocks.len(), "Writing blocks to database"); - let first_number = blocks.first().unwrap().block().number; + // NOTE: checked non-empty above + let first_block = blocks.first().unwrap().block(); + let last_block = blocks.last().unwrap().block().clone(); - let last = blocks.last().unwrap().block(); - let last_block_number = last.number; + // use the storage writer to write receipts + let current_block = first_block.number; + debug!(target: "tree::persistence", len=blocks.len(), ?current_block, "Writing execution data to static files"); + + let receipts_writer = + static_file_provider.get_writer(first_block.number, StaticFileSegment::Receipts)?; + + { + let mut storage_writer = StorageWriter::new(Some(provider_rw), Some(receipts_writer)); + let receipts_iter = blocks.iter().map(|block| { + let receipts = block.execution_outcome().receipts().receipt_vec.clone(); + debug_assert!(receipts.len() == 1); + receipts.first().unwrap().clone() + }); + storage_writer.append_receipts_from_blocks(current_block, receipts_iter)?; + } + + debug!(target: "tree::persistence", block_count = %blocks.len(), "Writing blocks to database"); + let first_number = first_block.number; + + let last_block_number = last_block.number; // TODO: remove all the clones and do performant / batched writes for each type of object // instead of a loop over all blocks, @@ -75,6 +164,7 @@ impl PersistenceService { let sealed_block = block.block().clone().try_with_senders_unchecked(block.senders().clone()).unwrap(); provider_rw.insert_block(sealed_block)?; + self.write_transactions(block.block.clone(), provider_rw)?; // Write state and changesets to the database. // Must be written after blocks because of the receipt lookup. @@ -103,121 +193,6 @@ impl PersistenceService { Ok(()) } - /// Removes block data above the given block number from the database. - /// This is exclusive, i.e., it only removes blocks above `block_number`, and does not remove - /// `block_number`. - /// - /// This will then send a command to the static file service, to remove the actual block data. - fn remove_blocks_above(&self, block_number: u64) -> ProviderResult<()> { - debug!(target: "tree::persistence", ?block_number, "Removing blocks from database above block_number"); - let provider_rw = self.provider.provider_rw()?; - let highest_block = self.provider.last_block_number()?; - provider_rw.remove_block_and_execution_range(block_number..=highest_block)?; - provider_rw.commit()?; - - Ok(()) - } - - /// Prunes block data before the given block hash according to the configured prune - /// configuration. - fn prune_before(&mut self, block_num: u64) -> PrunerOutput { - debug!(target: "tree::persistence", ?block_num, "Running pruner"); - // TODO: doing this properly depends on pruner segment changes - self.pruner.run(block_num).expect("todo: handle errors") - } - - /// Updates checkpoints related to block headers and bodies. This should be called after new - /// transactions have been successfully written to disk. - fn update_transaction_meta( - &self, - block_num: u64, - td: U256, - provider_rw: &DatabaseProviderRW, - ) -> ProviderResult<()> { - debug!(target: "tree::persistence", ?block_num, "Updating transaction metadata after writing"); - provider_rw - .tx_ref() - .put::(block_num, CompactU256(td))?; - provider_rw.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block_num))?; - provider_rw.save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block_num))?; - Ok(()) - } - - /// Writes the transactions to static files. - /// - /// Returns the block number and new total difficulty. - /// - /// The [`update_transaction_meta`](Self::update_transaction_meta) method should be called - /// after this, to update the checkpoints for headers and block bodies. - #[instrument(level = "trace", skip_all, fields(block = ?block.num_hash()) target = "engine")] - fn write_transactions( - &self, - block: Arc, - provider_rw: &DatabaseProviderRW, - ) -> ProviderResult<(u64, U256)> { - debug!(target: "tree::persistence", "Writing transactions"); - let provider = self.provider.static_file_provider(); - - let new_td = { - let header_writer = provider.get_writer(block.number, StaticFileSegment::Headers)?; - let mut storage_writer = StorageWriter::new(Some(provider_rw), Some(header_writer)); - let new_td = storage_writer.append_headers_from_blocks( - block.header().number, - std::iter::once(&(block.header(), block.hash())), - )?; - - let transactions_writer = - provider.get_writer(block.number, StaticFileSegment::Transactions)?; - let mut storage_writer = - StorageWriter::new(Some(provider_rw), Some(transactions_writer)); - let no_hash_transactions = - block.body.clone().into_iter().map(TransactionSignedNoHash::from).collect(); - storage_writer.append_transactions_from_blocks( - block.header().number, - std::iter::once(&no_hash_transactions), - )?; - - new_td - }; - - Ok((block.number, new_td)) - } - - /// Write execution-related block data to database and/or static files. - fn write_execution_data( - &self, - blocks: &[ExecutedBlock], - provider_rw: &DatabaseProviderRW, - ) -> ProviderResult<()> { - if blocks.is_empty() { - return Ok(()) - } - let provider = self.provider.static_file_provider(); - - // NOTE: checked non-empty above - let first_block = blocks.first().unwrap().block(); - let last_block = blocks.last().unwrap().block().clone(); - - // use the storage writer - let current_block = first_block.number; - debug!(target: "tree::persistence", len=blocks.len(), ?current_block, "Writing execution data to static files"); - - let receipts_writer = - provider.get_writer(first_block.number, StaticFileSegment::Receipts)?; - - { - let mut storage_writer = StorageWriter::new(Some(provider_rw), Some(receipts_writer)); - let receipts_iter = blocks.iter().map(|block| { - let receipts = block.execution_outcome().receipts().receipt_vec.clone(); - debug_assert!(receipts.len() == 1); - receipts.first().unwrap().clone() - }); - storage_writer.append_receipts_from_blocks(current_block, receipts_iter)?; - } - - Ok(()) - } - /// Removes the blocks above the given block number from static files. Also removes related /// receipt and header data. /// @@ -289,19 +264,11 @@ where let last_block_hash = blocks.last().unwrap().block().hash(); let provider_rw = self.provider.provider_rw().expect("todo: handle errors"); - self.write_execution_data(&blocks, &provider_rw).expect("todo: handle errors"); - self.write(&blocks, &provider_rw).expect("todo: handle errors"); + let static_file_provider = self.provider.static_file_provider(); + self.save_blocks(&blocks, &provider_rw, &static_file_provider) + .expect("todo: handle errors"); - for block in &blocks { - // first write transactions - let (block_num, td) = self - .write_transactions(block.block.clone(), &provider_rw) - .expect("todo: handle errors"); - self.update_transaction_meta(block_num, td, &provider_rw) - .expect("todo: handle errors"); - } - - self.provider.static_file_provider().commit().expect("todo: handle errors"); + static_file_provider.commit().expect("todo: handle errors"); provider_rw.commit().expect("todo: handle errors"); // we ignore the error because the caller may or may not care about the result