From 404f8f877842efa887a1b39e78de2eb6ec97289d Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Tue, 26 Nov 2024 05:38:46 +0400 Subject: [PATCH] refactor: unify logic for writing headers (#12858) --- .../src/providers/database/provider.rs | 39 +++++----- crates/storage/provider/src/traits/block.rs | 5 +- crates/storage/provider/src/writer/mod.rs | 72 +------------------ 3 files changed, 29 insertions(+), 87 deletions(-) diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 3723e2606d..5110b067f4 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -2749,22 +2749,12 @@ impl BlockWrite fn insert_block( &self, block: SealedBlockWithSenders, - write_transactions_to: StorageLocation, + write_to: StorageLocation, ) -> ProviderResult { let block_number = block.number; let mut durations_recorder = metrics::DurationsRecorder::default(); - self.tx.put::(block_number, block.hash())?; - durations_recorder.record_relative(metrics::Action::InsertCanonicalHeaders); - - // Put header with canonical hashes. - self.tx.put::(block_number, block.header.as_ref().clone())?; - durations_recorder.record_relative(metrics::Action::InsertHeaders); - - self.tx.put::(block.hash(), block_number)?; - durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers); - // total difficulty let ttd = if block_number == 0 { block.difficulty @@ -2775,8 +2765,26 @@ impl BlockWrite parent_ttd + block.difficulty }; - self.tx.put::(block_number, ttd.into())?; - durations_recorder.record_relative(metrics::Action::InsertHeaderTerminalDifficulties); + if write_to.database() { + self.tx.put::(block_number, block.hash())?; + durations_recorder.record_relative(metrics::Action::InsertCanonicalHeaders); + + // Put header with canonical hashes. + self.tx.put::(block_number, block.header.as_ref().clone())?; + durations_recorder.record_relative(metrics::Action::InsertHeaders); + + self.tx.put::(block_number, ttd.into())?; + durations_recorder.record_relative(metrics::Action::InsertHeaderTerminalDifficulties); + } + + if write_to.static_files() { + let mut writer = + self.static_file_provider.get_writer(block_number, StaticFileSegment::Headers)?; + writer.append_header(&block.header, ttd, &block.hash())?; + } + + self.tx.put::(block.hash(), block_number)?; + durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers); let mut next_tx_num = self .tx @@ -2805,10 +2813,7 @@ impl BlockWrite next_tx_num += 1; } - self.append_block_bodies( - vec![(block_number, Some(block.block.body))], - write_transactions_to, - )?; + self.append_block_bodies(vec![(block_number, Some(block.block.body))], write_to)?; debug!( target: "providers::db", diff --git a/crates/storage/provider/src/traits/block.rs b/crates/storage/provider/src/traits/block.rs index aec5436265..59a5f9b3f6 100644 --- a/crates/storage/provider/src/traits/block.rs +++ b/crates/storage/provider/src/traits/block.rs @@ -68,10 +68,13 @@ pub trait BlockWriter: Send + Sync { /// /// Return [StoredBlockBodyIndices] that contains indices of the first and last transactions and /// transition in the block. + /// + /// Accepts [`StorageLocation`] value which specifies where transactions and headers should be + /// written. fn insert_block( &self, block: SealedBlockWithSenders, - write_transactions_to: StorageLocation, + write_to: StorageLocation, ) -> ProviderResult; /// Appends a batch of block bodies extending the canonical chain. This is invoked during diff --git a/crates/storage/provider/src/writer/mod.rs b/crates/storage/provider/src/writer/mod.rs index e91ea3bea3..2855251436 100644 --- a/crates/storage/provider/src/writer/mod.rs +++ b/crates/storage/provider/src/writer/mod.rs @@ -4,26 +4,22 @@ use crate::{ BlockExecutionWriter, BlockWriter, HistoryWriter, StateChangeWriter, StateWriter, StaticFileProviderFactory, StorageLocation, TrieWriter, }; -use alloy_consensus::Header; -use alloy_primitives::{BlockNumber, B256, U256}; +use alloy_primitives::BlockNumber; use reth_chain_state::ExecutedBlock; use reth_db::{ cursor::DbCursorRO, - models::CompactU256, tables, transaction::{DbTx, DbTxMut}, }; use reth_errors::{ProviderError, ProviderResult}; use reth_execution_types::ExecutionOutcome; -use reth_primitives::{SealedBlock, StaticFileSegment}; -use reth_stages_types::{StageCheckpoint, StageId}; +use reth_primitives::StaticFileSegment; use reth_storage_api::{ DBProvider, HeaderProvider, ReceiptWriter, StageCheckpointWriter, TransactionsProviderExt, }; use reth_storage_errors::writer::UnifiedStorageWriterError; use revm::db::OriginalValuesKnown; -use std::{borrow::Borrow, sync::Arc}; -use tracing::{debug, instrument}; +use tracing::debug; mod database; mod static_file; @@ -196,7 +192,6 @@ where let sealed_block = block.block().clone().try_with_senders_unchecked(block.senders().clone()).unwrap(); self.database().insert_block(sealed_block, StorageLocation::Both)?; - self.save_header_and_transactions(block.block.clone())?; // Write state and changesets to the database. // Must be written after blocks because of the receipt lookup. @@ -223,35 +218,6 @@ where Ok(()) } - /// Writes the header & transactions to static files, and updates their respective checkpoints - /// on database. - #[instrument(level = "trace", skip_all, fields(block = ?block.num_hash()) target = "storage")] - fn save_header_and_transactions(&self, block: Arc) -> ProviderResult<()> { - debug!(target: "provider::storage_writer", "Writing headers and transactions."); - - { - let header_writer = - self.static_file().get_writer(block.number, StaticFileSegment::Headers)?; - let mut storage_writer = UnifiedStorageWriter::from(self.database(), header_writer); - let td = storage_writer.append_headers_from_blocks( - block.header().number, - std::iter::once(&(block.header(), block.hash())), - )?; - - debug!(target: "provider::storage_writer", block_num=block.number, "Updating transaction metadata after writing"); - self.database() - .tx_ref() - .put::(block.number, CompactU256(td))?; - self.database() - .save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block.number))?; - } - - self.database() - .save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block.number))?; - - Ok(()) - } - /// Removes all block, transaction and receipt data above the given block number from the /// database and static files. This is exclusive, i.e., it only removes blocks above /// `block_number`, and does not remove `block_number`. @@ -323,38 +289,6 @@ where None => Err(UnifiedStorageWriterError::MissingStaticFileWriter), } } - - /// Appends headers to static files, using the - /// [`HeaderTerminalDifficulties`](tables::HeaderTerminalDifficulties) table to determine the - /// total difficulty of the parent block during header insertion. - /// - /// NOTE: The static file writer used to construct this [`UnifiedStorageWriter`] MUST be a - /// writer for the Headers segment. - pub fn append_headers_from_blocks( - &mut self, - initial_block_number: BlockNumber, - headers: impl Iterator, - ) -> ProviderResult - where - I: Borrow<(H, B256)>, - H: Borrow
, - { - self.ensure_static_file_segment(StaticFileSegment::Headers)?; - - let mut td = self - .database() - .header_td_by_number(initial_block_number)? - .ok_or(ProviderError::TotalDifficultyNotFound(initial_block_number))?; - - for pair in headers { - let (header, hash) = pair.borrow(); - let header = header.borrow(); - td += header.difficulty; - self.static_file_mut().append_header(header, td, hash)?; - } - - Ok(td) - } } impl