diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index 5dbaefcd29..314d0eba9d 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -7,7 +7,7 @@ use reth_ethereum_primitives::EthPrimitives; use reth_primitives_traits::NodePrimitives; use reth_provider::{ providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter, - DBProvider, DatabaseProviderFactory, ProviderFactory, + DBProvider, DatabaseProviderFactory, ProviderFactory, SaveBlocksMode, }; use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory}; use reth_stages_api::{MetricEvent, MetricEventsSender}; @@ -151,7 +151,7 @@ where if last_block.is_some() { let provider_rw = self.provider.database_provider_rw()?; - provider_rw.save_blocks(blocks)?; + provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?; provider_rw.commit()?; } diff --git a/crates/optimism/cli/src/commands/import_receipts.rs b/crates/optimism/cli/src/commands/import_receipts.rs index db25afe909..8ab71f6610 100644 --- a/crates/optimism/cli/src/commands/import_receipts.rs +++ b/crates/optimism/cli/src/commands/import_receipts.rs @@ -18,7 +18,7 @@ use reth_optimism_primitives::{bedrock::is_dup_tx, OpPrimitives, OpReceipt}; use reth_primitives_traits::NodePrimitives; use reth_provider::{ providers::ProviderNodeTypes, DBProvider, DatabaseProviderFactory, OriginalValuesKnown, - ProviderFactory, StageCheckpointReader, StageCheckpointWriter, StateWriter, + ProviderFactory, StageCheckpointReader, StageCheckpointWriter, StateWriteConfig, StateWriter, StaticFileProviderFactory, StatsReader, }; use reth_stages::{StageCheckpoint, StageId}; @@ -228,7 +228,11 @@ where ExecutionOutcome::new(Default::default(), receipts, first_block, Default::default()); // finally, write the receipts - provider.write_state(&execution_outcome, OriginalValuesKnown::Yes)?; + provider.write_state( + &execution_outcome, + OriginalValuesKnown::Yes, + StateWriteConfig::default(), + )?; } // Only commit if we have imported as many receipts as the number of transactions. diff --git a/crates/stages/stages/src/stages/execution.rs b/crates/stages/stages/src/stages/execution.rs index f78b825822..593180926d 100644 --- a/crates/stages/stages/src/stages/execution.rs +++ b/crates/stages/stages/src/stages/execution.rs @@ -12,7 +12,7 @@ use reth_primitives_traits::{format_gas_throughput, BlockBody, NodePrimitives}; use reth_provider::{ providers::{StaticFileProvider, StaticFileWriter}, BlockHashReader, BlockReader, DBProvider, EitherWriter, ExecutionOutcome, HeaderProvider, - LatestStateProviderRef, OriginalValuesKnown, ProviderError, StateWriter, + LatestStateProviderRef, OriginalValuesKnown, ProviderError, StateWriteConfig, StateWriter, StaticFileProviderFactory, StatsReader, StorageSettingsCache, TransactionVariant, }; use reth_revm::database::StateProviderDatabase; @@ -463,7 +463,7 @@ where } // write output - provider.write_state(&state, OriginalValuesKnown::Yes)?; + provider.write_state(&state, OriginalValuesKnown::Yes, StateWriteConfig::default())?; let db_write_duration = time.elapsed(); debug!( diff --git a/crates/storage/db-common/src/init.rs b/crates/storage/db-common/src/init.rs index d2b7b7f114..c82025970b 100644 --- a/crates/storage/db-common/src/init.rs +++ b/crates/storage/db-common/src/init.rs @@ -16,8 +16,8 @@ use reth_provider::{ errors::provider::ProviderResult, providers::StaticFileWriter, BlockHashReader, BlockNumReader, BundleStateInit, ChainSpecProvider, DBProvider, DatabaseProviderFactory, ExecutionOutcome, HashingWriter, HeaderProvider, HistoryWriter, MetadataWriter, OriginalValuesKnown, - ProviderError, RevertsInit, StageCheckpointReader, StageCheckpointWriter, StateWriter, - StaticFileProviderFactory, StorageSettings, StorageSettingsCache, TrieWriter, + ProviderError, RevertsInit, StageCheckpointReader, StageCheckpointWriter, StateWriteConfig, + StateWriter, StaticFileProviderFactory, StorageSettings, StorageSettingsCache, TrieWriter, }; use reth_stages_types::{StageCheckpoint, StageId}; use reth_static_file_types::StaticFileSegment; @@ -334,7 +334,11 @@ where Vec::new(), ); - provider.write_state(&execution_outcome, OriginalValuesKnown::Yes)?; + provider.write_state( + &execution_outcome, + OriginalValuesKnown::Yes, + StateWriteConfig::default(), + )?; trace!(target: "reth::cli", "Inserted state"); diff --git a/crates/storage/errors/src/provider.rs b/crates/storage/errors/src/provider.rs index 8e15004645..c6d5a2e260 100644 --- a/crates/storage/errors/src/provider.rs +++ b/crates/storage/errors/src/provider.rs @@ -225,6 +225,9 @@ pub enum StaticFileWriterError { /// Cannot call `sync_all` or `finalize` when prune is queued. #[error("cannot call sync_all or finalize when prune is queued, use commit() instead")] FinalizeWithPruneQueued, + /// Thread panicked during execution. + #[error("thread panicked: {_0}")] + ThreadPanic(&'static str), /// Other error with message. #[error("{0}")] Other(String), diff --git a/crates/storage/libmdbx-rs/src/txn_manager.rs b/crates/storage/libmdbx-rs/src/txn_manager.rs index 0b1202095e..601d82b805 100644 --- a/crates/storage/libmdbx-rs/src/txn_manager.rs +++ b/crates/storage/libmdbx-rs/src/txn_manager.rs @@ -58,6 +58,9 @@ impl TxnManager { match rx.recv() { Ok(msg) => match msg { TxnManagerMessage::Begin { parent, flags, sender } => { + let _span = + tracing::debug_span!(target: "libmdbx::txn", "begin", flags) + .entered(); let mut txn: *mut ffi::MDBX_txn = ptr::null_mut(); let res = mdbx_result(unsafe { ffi::mdbx_txn_begin_ex( @@ -72,9 +75,13 @@ impl TxnManager { sender.send(res).unwrap(); } TxnManagerMessage::Abort { tx, sender } => { + let _span = + tracing::debug_span!(target: "libmdbx::txn", "abort").entered(); sender.send(mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) })).unwrap(); } TxnManagerMessage::Commit { tx, sender } => { + let _span = + tracing::debug_span!(target: "libmdbx::txn", "commit").entered(); sender .send({ let mut latency = CommitLatency::new(); diff --git a/crates/storage/provider/src/either_writer.rs b/crates/storage/provider/src/either_writer.rs index 5336b773e6..5cc79d8522 100644 --- a/crates/storage/provider/src/either_writer.rs +++ b/crates/storage/provider/src/either_writer.rs @@ -429,6 +429,41 @@ where } } + /// Puts multiple transaction hash number mappings in a batch. + /// + /// Accepts a vector of `(TxHash, TxNumber)` tuples and writes them all using the same cursor. + /// This is more efficient than calling `put_transaction_hash_number` repeatedly. + /// + /// When `append_only` is true, uses `cursor.append()` which requires entries to be + /// pre-sorted and the table to be empty or have only lower keys. + /// When false, uses `cursor.upsert()` which handles arbitrary insertion order. + pub fn put_transaction_hash_numbers_batch( + &mut self, + entries: Vec<(TxHash, TxNumber)>, + append_only: bool, + ) -> ProviderResult<()> { + match self { + Self::Database(cursor) => { + for (hash, tx_num) in entries { + if append_only { + cursor.append(hash, &tx_num)?; + } else { + cursor.upsert(hash, &tx_num)?; + } + } + Ok(()) + } + Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), + #[cfg(all(unix, feature = "rocksdb"))] + Self::RocksDB(batch) => { + for (hash, tx_num) in entries { + batch.put::(hash, &tx_num)?; + } + Ok(()) + } + } + } + /// Deletes a transaction hash number mapping. pub fn delete_transaction_hash_number(&mut self, hash: TxHash) -> ProviderResult<()> { match self { diff --git a/crates/storage/provider/src/lib.rs b/crates/storage/provider/src/lib.rs index 317216dc94..bfab44cb2a 100644 --- a/crates/storage/provider/src/lib.rs +++ b/crates/storage/provider/src/lib.rs @@ -21,7 +21,8 @@ pub mod providers; pub use providers::{ DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, HistoricalStateProvider, HistoricalStateProviderRef, LatestStateProvider, LatestStateProviderRef, ProviderFactory, - StaticFileAccess, StaticFileProviderBuilder, StaticFileWriter, + SaveBlocksMode, StaticFileAccess, StaticFileProviderBuilder, StaticFileWriteCtx, + StaticFileWriter, }; pub mod changeset_walker; @@ -44,8 +45,8 @@ pub use revm_database::states::OriginalValuesKnown; // reexport traits to avoid breaking changes pub use reth_static_file_types as static_file; pub use reth_storage_api::{ - HistoryWriter, MetadataProvider, MetadataWriter, StatsReader, StorageSettings, - StorageSettingsCache, + HistoryWriter, MetadataProvider, MetadataWriter, StateWriteConfig, StatsReader, + StorageSettings, StorageSettingsCache, }; /// Re-export provider error. pub use reth_storage_errors::provider::{ProviderError, ProviderResult}; diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index 3565d99d8d..e12095ff44 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -789,7 +789,7 @@ mod tests { create_test_provider_factory, create_test_provider_factory_with_chain_spec, MockNodeTypesWithDB, }, - BlockWriter, CanonChainTracker, ProviderFactory, + BlockWriter, CanonChainTracker, ProviderFactory, SaveBlocksMode, }; use alloy_eips::{BlockHashOrNumber, BlockNumHash, BlockNumberOrTag}; use alloy_primitives::{BlockNumber, TxNumber, B256}; @@ -808,8 +808,8 @@ mod tests { use reth_storage_api::{ BlockBodyIndicesProvider, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt, BlockSource, ChangeSetReader, DBProvider, DatabaseProviderFactory, - HeaderProvider, ReceiptProvider, ReceiptProviderIdExt, StateProviderFactory, StateWriter, - TransactionVariant, TransactionsProvider, + HeaderProvider, ReceiptProvider, ReceiptProviderIdExt, StateProviderFactory, + StateWriteConfig, StateWriter, TransactionVariant, TransactionsProvider, }; use reth_testing_utils::generators::{ self, random_block, random_block_range, random_changeset_range, random_eoa_accounts, @@ -907,6 +907,7 @@ mod tests { ..Default::default() }, OriginalValuesKnown::No, + StateWriteConfig::default(), )?; } @@ -997,7 +998,7 @@ mod tests { // Push to disk let provider_rw = hook_provider.database_provider_rw().unwrap(); - provider_rw.save_blocks(vec![lowest_memory_block]).unwrap(); + provider_rw.save_blocks(vec![lowest_memory_block], SaveBlocksMode::Full).unwrap(); provider_rw.commit().unwrap(); // Remove from memory diff --git a/crates/storage/provider/src/providers/database/metrics.rs b/crates/storage/provider/src/providers/database/metrics.rs index de7dc0b542..45186da71d 100644 --- a/crates/storage/provider/src/providers/database/metrics.rs +++ b/crates/storage/provider/src/providers/database/metrics.rs @@ -40,16 +40,8 @@ pub(crate) enum Action { InsertHeaderNumbers, InsertBlockBodyIndices, InsertTransactionBlocks, - GetNextTxNum, InsertTransactionSenders, InsertTransactionHashNumbers, - SaveBlocksInsertBlock, - SaveBlocksWriteState, - SaveBlocksWriteHashedState, - SaveBlocksWriteTrieChangesets, - SaveBlocksWriteTrieUpdates, - SaveBlocksUpdateHistoryIndices, - SaveBlocksUpdatePipelineStages, } /// Database provider metrics @@ -66,19 +58,24 @@ pub(crate) struct DatabaseProviderMetrics { insert_history_indices: Histogram, /// Duration of update pipeline stages update_pipeline_stages: Histogram, - /// Duration of insert canonical headers /// Duration of insert header numbers insert_header_numbers: Histogram, /// Duration of insert block body indices insert_block_body_indices: Histogram, /// Duration of insert transaction blocks insert_tx_blocks: Histogram, - /// Duration of get next tx num - get_next_tx_num: Histogram, /// Duration of insert transaction senders insert_transaction_senders: Histogram, /// Duration of insert transaction hash numbers insert_transaction_hash_numbers: Histogram, + /// Duration of `save_blocks` + save_blocks_total: Histogram, + /// Duration of MDBX work in `save_blocks` + save_blocks_mdbx: Histogram, + /// Duration of static file work in `save_blocks` + save_blocks_sf: Histogram, + /// Duration of `RocksDB` work in `save_blocks` + save_blocks_rocksdb: Histogram, /// Duration of `insert_block` in `save_blocks` save_blocks_insert_block: Histogram, /// Duration of `write_state` in `save_blocks` @@ -93,6 +90,39 @@ pub(crate) struct DatabaseProviderMetrics { save_blocks_update_history_indices: Histogram, /// Duration of `update_pipeline_stages` in `save_blocks` save_blocks_update_pipeline_stages: Histogram, + /// Number of blocks per `save_blocks` call + save_blocks_block_count: Histogram, + /// Duration of MDBX commit in `save_blocks` + save_blocks_commit_mdbx: Histogram, + /// Duration of static file commit in `save_blocks` + save_blocks_commit_sf: Histogram, + /// Duration of `RocksDB` commit in `save_blocks` + save_blocks_commit_rocksdb: Histogram, +} + +/// Timings collected during a `save_blocks` call. +#[derive(Debug, Default)] +pub(crate) struct SaveBlocksTimings { + pub total: Duration, + pub mdbx: Duration, + pub sf: Duration, + pub rocksdb: Duration, + pub insert_block: Duration, + pub write_state: Duration, + pub write_hashed_state: Duration, + pub write_trie_changesets: Duration, + pub write_trie_updates: Duration, + pub update_history_indices: Duration, + pub update_pipeline_stages: Duration, + pub block_count: u64, +} + +/// Timings collected during a `commit` call. +#[derive(Debug, Default)] +pub(crate) struct CommitTimings { + pub mdbx: Duration, + pub sf: Duration, + pub rocksdb: Duration, } impl DatabaseProviderMetrics { @@ -107,28 +137,33 @@ impl DatabaseProviderMetrics { Action::InsertHeaderNumbers => self.insert_header_numbers.record(duration), Action::InsertBlockBodyIndices => self.insert_block_body_indices.record(duration), Action::InsertTransactionBlocks => self.insert_tx_blocks.record(duration), - Action::GetNextTxNum => self.get_next_tx_num.record(duration), Action::InsertTransactionSenders => self.insert_transaction_senders.record(duration), Action::InsertTransactionHashNumbers => { self.insert_transaction_hash_numbers.record(duration) } - Action::SaveBlocksInsertBlock => self.save_blocks_insert_block.record(duration), - Action::SaveBlocksWriteState => self.save_blocks_write_state.record(duration), - Action::SaveBlocksWriteHashedState => { - self.save_blocks_write_hashed_state.record(duration) - } - Action::SaveBlocksWriteTrieChangesets => { - self.save_blocks_write_trie_changesets.record(duration) - } - Action::SaveBlocksWriteTrieUpdates => { - self.save_blocks_write_trie_updates.record(duration) - } - Action::SaveBlocksUpdateHistoryIndices => { - self.save_blocks_update_history_indices.record(duration) - } - Action::SaveBlocksUpdatePipelineStages => { - self.save_blocks_update_pipeline_stages.record(duration) - } } } + + /// Records all `save_blocks` timings. + pub(crate) fn record_save_blocks(&self, timings: &SaveBlocksTimings) { + self.save_blocks_total.record(timings.total); + self.save_blocks_mdbx.record(timings.mdbx); + self.save_blocks_sf.record(timings.sf); + self.save_blocks_rocksdb.record(timings.rocksdb); + self.save_blocks_insert_block.record(timings.insert_block); + self.save_blocks_write_state.record(timings.write_state); + self.save_blocks_write_hashed_state.record(timings.write_hashed_state); + self.save_blocks_write_trie_changesets.record(timings.write_trie_changesets); + self.save_blocks_write_trie_updates.record(timings.write_trie_updates); + self.save_blocks_update_history_indices.record(timings.update_history_indices); + self.save_blocks_update_pipeline_stages.record(timings.update_pipeline_stages); + self.save_blocks_block_count.record(timings.block_count as f64); + } + + /// Records all commit timings. + pub(crate) fn record_commit(&self, timings: &CommitTimings) { + self.save_blocks_commit_mdbx.record(timings.mdbx); + self.save_blocks_commit_sf.record(timings.sf); + self.save_blocks_commit_rocksdb.record(timings.rocksdb); + } } diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index 38208f14c3..99c81755b4 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -43,7 +43,7 @@ use std::{ use tracing::trace; mod provider; -pub use provider::{DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW}; +pub use provider::{DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode}; use super::ProviderNodeTypes; use reth_trie::KeccakKeyHasher; diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 78424785cc..692bc7737c 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -5,7 +5,7 @@ use crate::{ providers::{ database::{chain::ChainStorage, metrics}, rocksdb::RocksDBProvider, - static_file::StaticFileWriter, + static_file::{StaticFileWriteCtx, StaticFileWriter}, NodeTypesForProvider, StaticFileProvider, }, to_range, @@ -35,7 +35,7 @@ use alloy_primitives::{ use itertools::Itertools; use parking_lot::RwLock; use rayon::slice::ParallelSliceMut; -use reth_chain_state::ExecutedBlock; +use reth_chain_state::{ComputedTrieData, ExecutedBlock}; use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec}; use reth_db_api::{ cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW}, @@ -61,10 +61,10 @@ use reth_stages_types::{StageCheckpoint, StageId}; use reth_static_file_types::StaticFileSegment; use reth_storage_api::{ BlockBodyIndicesProvider, BlockBodyReader, MetadataProvider, MetadataWriter, - NodePrimitivesProvider, StateProvider, StorageChangeSetReader, StorageSettingsCache, - TryIntoHistoricalStateProvider, + NodePrimitivesProvider, StateProvider, StateWriteConfig, StorageChangeSetReader, + StorageSettingsCache, TryIntoHistoricalStateProvider, }; -use reth_storage_errors::provider::ProviderResult; +use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError}; use reth_trie::{ trie_cursor::{ InMemoryTrieCursor, InMemoryTrieCursorFactory, TrieCursor, TrieCursorFactory, @@ -85,9 +85,10 @@ use std::{ fmt::Debug, ops::{Deref, DerefMut, Range, RangeBounds, RangeFrom, RangeInclusive}, sync::Arc, - time::{Duration, Instant}, + thread, + time::Instant, }; -use tracing::{debug, trace}; +use tracing::{debug, instrument, trace}; /// A [`DatabaseProvider`] that holds a read-only database transaction. pub type DatabaseProviderRO = DatabaseProvider<::TX, N>; @@ -150,6 +151,25 @@ impl From> } } +/// Mode for [`DatabaseProvider::save_blocks`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SaveBlocksMode { + /// Full mode: write block structure + receipts + state + trie. + /// Used by engine/production code. + Full, + /// Blocks only: write block structure (headers, txs, senders, indices). + /// Receipts/state/trie are skipped - they may come later via separate calls. + /// Used by `insert_block`. + BlocksOnly, +} + +impl SaveBlocksMode { + /// Returns `true` if this is [`SaveBlocksMode::Full`]. + pub const fn with_state(self) -> bool { + matches!(self, Self::Full) + } +} + /// A provider struct that fetches data from the database. /// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`] pub struct DatabaseProvider { @@ -356,98 +376,257 @@ impl DatabaseProvider ProviderResult { + let tip = self.last_block_number()?.max(last_block); + Ok(StaticFileWriteCtx { + write_senders: EitherWriterDestination::senders(self).is_static_file() && + self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()), + write_receipts: save_mode.with_state() && + EitherWriter::receipts_destination(self).is_static_file(), + write_account_changesets: save_mode.with_state() && + EitherWriterDestination::account_changesets(self).is_static_file(), + tip, + receipts_prune_mode: self.prune_modes.receipts, + // Receipts are prunable if no receipts exist in SF yet and within pruning distance + receipts_prunable: self + .static_file_provider + .get_highest_static_file_tx(StaticFileSegment::Receipts) + .is_none() && + PruneMode::Distance(self.minimum_pruning_distance) + .should_prune(first_block, tip), + }) + } + /// Writes executed blocks and state to storage. - pub fn save_blocks(&self, blocks: Vec>) -> ProviderResult<()> { + /// + /// This method parallelizes static file (SF) writes with MDBX writes. + /// The SF thread writes headers, transactions, senders (if SF), and receipts (if SF, Full mode + /// only). The main thread writes MDBX data (indices, state, trie - Full mode only). + /// + /// Use [`SaveBlocksMode::Full`] for production (includes receipts, state, trie). + /// Use [`SaveBlocksMode::BlocksOnly`] for block structure only (used by `insert_block`). + #[instrument(level = "debug", target = "providers::db", skip_all, fields(block_count = blocks.len()))] + pub fn save_blocks( + &self, + blocks: Vec>, + save_mode: SaveBlocksMode, + ) -> ProviderResult<()> { if blocks.is_empty() { debug!(target: "providers::db", "Attempted to write empty block range"); return Ok(()) } - // NOTE: checked non-empty above - let first_block = blocks.first().unwrap().recovered_block(); + let total_start = Instant::now(); + let block_count = blocks.len() as u64; + let first_number = blocks.first().unwrap().recovered_block().number(); + let last_block_number = blocks.last().unwrap().recovered_block().number(); - let last_block = blocks.last().unwrap().recovered_block(); - let first_number = first_block.number(); - let last_block_number = last_block.number(); + debug!(target: "providers::db", block_count, "Writing blocks and execution data to storage"); - debug!(target: "providers::db", block_count = %blocks.len(), "Writing blocks and execution data to storage"); + // Compute tx_nums upfront (both threads need these) + let first_tx_num = self + .tx + .cursor_read::()? + .last()? + .map(|(n, _)| n + 1) + .unwrap_or_default(); - // Accumulate durations for each step - let mut total_insert_block = Duration::ZERO; - let mut total_write_state = Duration::ZERO; - let mut total_write_hashed_state = Duration::ZERO; - let mut total_write_trie_changesets = Duration::ZERO; - let mut total_write_trie_updates = Duration::ZERO; + let tx_nums: Vec = { + let mut nums = Vec::with_capacity(blocks.len()); + let mut current = first_tx_num; + for block in &blocks { + nums.push(current); + current += block.recovered_block().body().transaction_count() as u64; + } + nums + }; - // TODO: Do performant / batched writes for each type of object - // instead of a loop over all blocks, - // meaning: - // * blocks - // * state - // * hashed state - // * trie updates (cannot naively extend, need helper) - // * indices (already done basically) - // Insert the blocks - for block in blocks { - let trie_data = block.trie_data(); - let ExecutedBlock { recovered_block, execution_output, .. } = block; - let block_number = recovered_block.number(); + let mut timings = metrics::SaveBlocksTimings { block_count, ..Default::default() }; + // avoid capturing &self.tx in scope below. + let sf_provider = &self.static_file_provider; + let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?; + + thread::scope(|s| { + // SF writes + let sf_handle = s.spawn(|| { + let start = Instant::now(); + sf_provider.write_blocks_data(&blocks, &tx_nums, sf_ctx)?; + Ok::<_, ProviderError>(start.elapsed()) + }); + + // MDBX writes + let mdbx_start = Instant::now(); + + // Collect all transaction hashes across all blocks, sort them, and write in batch + if !self.cached_storage_settings().transaction_hash_numbers_in_rocksdb && + self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) + { + let start = Instant::now(); + let mut all_tx_hashes = Vec::new(); + for (i, block) in blocks.iter().enumerate() { + let recovered_block = block.recovered_block(); + let mut tx_num = tx_nums[i]; + for transaction in recovered_block.body().transactions_iter() { + all_tx_hashes.push((*transaction.tx_hash(), tx_num)); + tx_num += 1; + } + } + + // Sort by hash for optimal MDBX insertion performance + all_tx_hashes.sort_unstable_by_key(|(hash, _)| *hash); + + // Write all transaction hash numbers in a single batch + self.with_rocksdb_batch(|batch| { + let mut tx_hash_writer = + EitherWriter::new_transaction_hash_numbers(self, batch)?; + tx_hash_writer.put_transaction_hash_numbers_batch(all_tx_hashes, false)?; + let raw_batch = tx_hash_writer.into_raw_rocksdb_batch(); + Ok(((), raw_batch)) + })?; + self.metrics.record_duration( + metrics::Action::InsertTransactionHashNumbers, + start.elapsed(), + ); + } + + for (i, block) in blocks.iter().enumerate() { + let recovered_block = block.recovered_block(); + + let start = Instant::now(); + self.insert_block_mdbx_only(recovered_block, tx_nums[i])?; + timings.insert_block += start.elapsed(); + + if save_mode.with_state() { + let execution_output = block.execution_outcome(); + let block_number = recovered_block.number(); + + // Write state and changesets to the database. + // Must be written after blocks because of the receipt lookup. + // Skip receipts/account changesets if they're being written to static files. + let start = Instant::now(); + self.write_state( + execution_output, + OriginalValuesKnown::No, + StateWriteConfig { + write_receipts: !sf_ctx.write_receipts, + write_account_changesets: !sf_ctx.write_account_changesets, + }, + )?; + timings.write_state += start.elapsed(); + + let trie_data = block.trie_data(); + + // insert hashes and intermediate merkle nodes + let start = Instant::now(); + self.write_hashed_state(&trie_data.hashed_state)?; + timings.write_hashed_state += start.elapsed(); + + let start = Instant::now(); + self.write_trie_changesets(block_number, &trie_data.trie_updates, None)?; + timings.write_trie_changesets += start.elapsed(); + + let start = Instant::now(); + self.write_trie_updates_sorted(&trie_data.trie_updates)?; + timings.write_trie_updates += start.elapsed(); + } + } + + // Full mode: update history indices + if save_mode.with_state() { + let start = Instant::now(); + self.update_history_indices(first_number..=last_block_number)?; + timings.update_history_indices = start.elapsed(); + } + + // Update pipeline progress let start = Instant::now(); - self.insert_block(&recovered_block)?; - total_insert_block += start.elapsed(); + self.update_pipeline_stages(last_block_number, false)?; + timings.update_pipeline_stages = start.elapsed(); - // Write state and changesets to the database. - // Must be written after blocks because of the receipt lookup. - let start = Instant::now(); - self.write_state(&execution_output, OriginalValuesKnown::No)?; - total_write_state += start.elapsed(); + timings.mdbx = mdbx_start.elapsed(); - // insert hashes and intermediate merkle nodes - let start = Instant::now(); - self.write_hashed_state(&trie_data.hashed_state)?; - total_write_hashed_state += start.elapsed(); + // Wait for SF thread + timings.sf = sf_handle + .join() + .map_err(|_| StaticFileWriterError::ThreadPanic("static file"))??; - let start = Instant::now(); - self.write_trie_changesets(block_number, &trie_data.trie_updates, None)?; - total_write_trie_changesets += start.elapsed(); + timings.total = total_start.elapsed(); + self.metrics.record_save_blocks(&timings); + debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data"); + + Ok(()) + }) + } + + /// Writes MDBX-only data for a block (indices, lookups, and senders if configured for MDBX). + /// + /// SF data (headers, transactions, senders if SF, receipts if SF) must be written separately. + #[instrument(level = "debug", target = "providers::db", skip_all)] + fn insert_block_mdbx_only( + &self, + block: &RecoveredBlock>, + first_tx_num: TxNumber, + ) -> ProviderResult { + if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) && + EitherWriterDestination::senders(self).is_database() + { let start = Instant::now(); - self.write_trie_updates_sorted(&trie_data.trie_updates)?; - total_write_trie_updates += start.elapsed(); + let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1)); + let mut cursor = self.tx.cursor_write::()?; + for (tx_num, sender) in tx_nums_iter.zip(block.senders_iter().copied()) { + cursor.append(tx_num, &sender)?; + } + self.metrics + .record_duration(metrics::Action::InsertTransactionSenders, start.elapsed()); } - // update history indices + let block_number = block.number(); + let tx_count = block.body().transaction_count() as u64; + let start = Instant::now(); - self.update_history_indices(first_number..=last_block_number)?; - let duration_update_history_indices = start.elapsed(); + self.tx.put::(block.hash(), block_number)?; + self.metrics.record_duration(metrics::Action::InsertHeaderNumbers, start.elapsed()); - // Update pipeline progress + self.write_block_body_indices(block_number, block.body(), first_tx_num, tx_count)?; + + Ok(StoredBlockBodyIndices { first_tx_num, tx_count }) + } + + /// Writes MDBX block body indices (`BlockBodyIndices`, `TransactionBlocks`, + /// `Ommers`/`Withdrawals`). + fn write_block_body_indices( + &self, + block_number: BlockNumber, + body: &BodyTy, + first_tx_num: TxNumber, + tx_count: u64, + ) -> ProviderResult<()> { + // MDBX: BlockBodyIndices let start = Instant::now(); - self.update_pipeline_stages(last_block_number, false)?; - let duration_update_pipeline_stages = start.elapsed(); + self.tx + .cursor_write::()? + .append(block_number, &StoredBlockBodyIndices { first_tx_num, tx_count })?; + self.metrics.record_duration(metrics::Action::InsertBlockBodyIndices, start.elapsed()); - // Record all metrics at the end - self.metrics.record_duration(metrics::Action::SaveBlocksInsertBlock, total_insert_block); - self.metrics.record_duration(metrics::Action::SaveBlocksWriteState, total_write_state); - self.metrics - .record_duration(metrics::Action::SaveBlocksWriteHashedState, total_write_hashed_state); - self.metrics.record_duration( - metrics::Action::SaveBlocksWriteTrieChangesets, - total_write_trie_changesets, - ); - self.metrics - .record_duration(metrics::Action::SaveBlocksWriteTrieUpdates, total_write_trie_updates); - self.metrics.record_duration( - metrics::Action::SaveBlocksUpdateHistoryIndices, - duration_update_history_indices, - ); - self.metrics.record_duration( - metrics::Action::SaveBlocksUpdatePipelineStages, - duration_update_pipeline_stages, - ); + // MDBX: TransactionBlocks (last tx -> block mapping) + if tx_count > 0 { + let start = Instant::now(); + self.tx + .cursor_write::()? + .append(first_tx_num + tx_count - 1, &block_number)?; + self.metrics.record_duration(metrics::Action::InsertTransactionBlocks, start.elapsed()); + } - debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data"); + // MDBX: Ommers/Withdrawals + self.storage.writer().write_block_bodies(self, vec![(block_number, Some(body))])?; Ok(()) } @@ -1727,6 +1906,7 @@ impl StageCheckpointWriter for DatabaseProvider(id.to_string(), checkpoint)?) } + #[instrument(level = "debug", target = "providers::db", skip_all)] fn update_pipeline_stages( &self, block_number: BlockNumber, @@ -1817,24 +1997,31 @@ impl StateWriter { type Receipt = ReceiptTy; + #[instrument(level = "debug", target = "providers::db", skip_all)] fn write_state( &self, execution_outcome: &ExecutionOutcome, is_value_known: OriginalValuesKnown, + config: StateWriteConfig, ) -> ProviderResult<()> { let first_block = execution_outcome.first_block(); + + let (plain_state, reverts) = + execution_outcome.bundle.to_plain_state_and_reverts(is_value_known); + + self.write_state_reverts(reverts, first_block, config)?; + self.write_state_changes(plain_state)?; + + if !config.write_receipts { + return Ok(()); + } + let block_count = execution_outcome.len() as u64; let last_block = execution_outcome.last_block(); let block_range = first_block..=last_block; let tip = self.last_block_number()?.max(last_block); - let (plain_state, reverts) = - execution_outcome.bundle.to_plain_state_and_reverts(is_value_known); - - self.write_state_reverts(reverts, first_block)?; - self.write_state_changes(plain_state)?; - // Fetch the first transaction number for each block in the range let block_indices: Vec<_> = self .block_body_indices_range(block_range)? @@ -1918,6 +2105,7 @@ impl StateWriter &self, reverts: PlainStateReverts, first_block: BlockNumber, + config: StateWriteConfig, ) -> ProviderResult<()> { // Write storage changes tracing::trace!("Writing storage changes"); @@ -1965,7 +2153,11 @@ impl StateWriter } } - // Write account changes to static files + if !config.write_account_changesets { + return Ok(()); + } + + // Write account changes tracing::debug!(target: "sync::stages::merkle_changesets", ?first_block, "Writing account changes"); for (block_index, account_block_reverts) in reverts.accounts.into_iter().enumerate() { let block_number = first_block + block_index as BlockNumber; @@ -2043,6 +2235,7 @@ impl StateWriter Ok(()) } + #[instrument(level = "debug", target = "providers::db", skip_all)] fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> { // Write hashed account updates. let mut hashed_accounts_cursor = self.tx_ref().cursor_write::()?; @@ -2336,6 +2529,7 @@ impl TrieWriter for DatabaseProvider /// Writes trie updates to the database with already sorted updates. /// /// Returns the number of entries modified. + #[instrument(level = "debug", target = "providers::db", skip_all)] fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult { if trie_updates.is_empty() { return Ok(0) @@ -2379,6 +2573,7 @@ impl TrieWriter for DatabaseProvider /// the same `TrieUpdates`. /// /// Returns the number of keys written. + #[instrument(level = "debug", target = "providers::db", skip_all)] fn write_trie_changesets( &self, block_number: BlockNumber, @@ -2970,6 +3165,7 @@ impl HistoryWriter for DatabaseProvi ) } + #[instrument(level = "debug", target = "providers::db", skip_all)] fn update_history_indices(&self, range: RangeInclusive) -> ProviderResult<()> { // account history stage { @@ -2987,7 +3183,7 @@ impl HistoryWriter for DatabaseProvi } } -impl BlockExecutionWriter +impl BlockExecutionWriter for DatabaseProvider { fn take_block_and_execution_above( @@ -3030,89 +3226,40 @@ impl BlockExecu } } -impl BlockWriter +impl BlockWriter for DatabaseProvider { type Block = BlockTy; type Receipt = ReceiptTy; - /// Inserts the block into the database, always modifying the following static file segments and - /// tables: - /// * [`StaticFileSegment::Headers`] - /// * [`tables::HeaderNumbers`] - /// * [`tables::BlockBodyIndices`] + /// Inserts the block into the database, writing to both static files and MDBX. /// - /// If there are transactions in the block, the following static file segments and tables will - /// be modified: - /// * [`StaticFileSegment::Transactions`] - /// * [`tables::TransactionBlocks`] - /// - /// If ommers are not empty, this will modify [`BlockOmmers`](tables::BlockOmmers). - /// If withdrawals are not empty, this will modify - /// [`BlockWithdrawals`](tables::BlockWithdrawals). - /// - /// If the provider has __not__ configured full sender pruning, this will modify either: - /// * [`StaticFileSegment::TransactionSenders`] if senders are written to static files - /// * [`tables::TransactionSenders`] if senders are written to the database - /// - /// If the provider has __not__ configured full transaction lookup pruning, this will modify - /// [`TransactionHashNumbers`](tables::TransactionHashNumbers). + /// This is a convenience method primarily used in tests. For production use, + /// prefer [`Self::save_blocks`] which handles execution output and trie data. fn insert_block( &self, block: &RecoveredBlock, ) -> ProviderResult { let block_number = block.number(); - let tx_count = block.body().transaction_count() as u64; - let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics); - - self.static_file_provider - .get_writer(block_number, StaticFileSegment::Headers)? - .append_header(block.header(), &block.hash())?; - - self.tx.put::(block.hash(), block_number)?; - durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers); - - let first_tx_num = self - .tx - .cursor_read::()? - .last()? - .map(|(n, _)| n + 1) - .unwrap_or_default(); - durations_recorder.record_relative(metrics::Action::GetNextTxNum); - - let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1)); - - if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) { - let mut senders_writer = EitherWriter::new_senders(self, block.number())?; - senders_writer.increment_block(block.number())?; - senders_writer - .append_senders(tx_nums_iter.clone().zip(block.senders_iter().copied()))?; - durations_recorder.record_relative(metrics::Action::InsertTransactionSenders); - } - - if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) { - self.with_rocksdb_batch(|batch| { - let mut writer = EitherWriter::new_transaction_hash_numbers(self, batch)?; - for (tx_num, transaction) in tx_nums_iter.zip(block.body().transactions_iter()) { - let hash = transaction.tx_hash(); - writer.put_transaction_hash_number(*hash, tx_num, false)?; - } - Ok(((), writer.into_raw_rocksdb_batch())) - })?; - durations_recorder.record_relative(metrics::Action::InsertTransactionHashNumbers); - } - - self.append_block_bodies(vec![(block_number, Some(block.body()))])?; - - debug!( - target: "providers::db", - ?block_number, - actions = ?durations_recorder.actions, - "Inserted block" + // Wrap block in ExecutedBlock with empty execution output (no receipts/state/trie) + let executed_block = ExecutedBlock::new( + Arc::new(block.clone()), + Arc::new(ExecutionOutcome::new( + Default::default(), + Vec::>>::new(), + block_number, + vec![], + )), + ComputedTrieData::default(), ); - Ok(StoredBlockBodyIndices { first_tx_num, tx_count }) + // Delegate to save_blocks with BlocksOnly mode (skips receipts/state/trie) + self.save_blocks(vec![executed_block], SaveBlocksMode::BlocksOnly)?; + + // Return the body indices + self.block_body_indices(block_number)? + .ok_or(ProviderError::BlockBodyIndicesNotFound(block_number)) } fn append_block_bodies( @@ -3298,7 +3445,7 @@ impl BlockWrite durations_recorder.record_relative(metrics::Action::InsertBlock); } - self.write_state(execution_outcome, OriginalValuesKnown::No)?; + self.write_state(execution_outcome, OriginalValuesKnown::No, StateWriteConfig::default())?; durations_recorder.record_relative(metrics::Action::InsertState); // insert hashes and intermediate merkle nodes @@ -3440,17 +3587,28 @@ impl DBProvider for DatabaseProvider self.static_file_provider.commit()?; } else { - self.static_file_provider.commit()?; + // Normal path: finalize() will call sync_all() if not already synced + let mut timings = metrics::CommitTimings::default(); + + let start = Instant::now(); + self.static_file_provider.finalize()?; + timings.sf = start.elapsed(); #[cfg(all(unix, feature = "rocksdb"))] { + let start = Instant::now(); let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock()); for batch in batches { self.rocksdb_provider.commit_batch(batch)?; } + timings.rocksdb = start.elapsed(); } + let start = Instant::now(); self.tx.commit()?; + timings.mdbx = start.elapsed(); + + self.metrics.record_commit(&timings); } Ok(()) @@ -3523,10 +3681,17 @@ mod tests { .write_state( &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() }, crate::OriginalValuesKnown::No, + StateWriteConfig::default(), ) .unwrap(); provider_rw.insert_block(&data.blocks[0].0).unwrap(); - provider_rw.write_state(&data.blocks[0].1, crate::OriginalValuesKnown::No).unwrap(); + provider_rw + .write_state( + &data.blocks[0].1, + crate::OriginalValuesKnown::No, + StateWriteConfig::default(), + ) + .unwrap(); provider_rw.commit().unwrap(); let provider = factory.provider().unwrap(); @@ -3549,11 +3714,18 @@ mod tests { .write_state( &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() }, crate::OriginalValuesKnown::No, + StateWriteConfig::default(), ) .unwrap(); for i in 0..3 { provider_rw.insert_block(&data.blocks[i].0).unwrap(); - provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap(); + provider_rw + .write_state( + &data.blocks[i].1, + crate::OriginalValuesKnown::No, + StateWriteConfig::default(), + ) + .unwrap(); } provider_rw.commit().unwrap(); @@ -3579,13 +3751,20 @@ mod tests { .write_state( &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() }, crate::OriginalValuesKnown::No, + StateWriteConfig::default(), ) .unwrap(); // insert blocks 1-3 with receipts for i in 0..3 { provider_rw.insert_block(&data.blocks[i].0).unwrap(); - provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap(); + provider_rw + .write_state( + &data.blocks[i].1, + crate::OriginalValuesKnown::No, + StateWriteConfig::default(), + ) + .unwrap(); } provider_rw.commit().unwrap(); @@ -3610,11 +3789,18 @@ mod tests { .write_state( &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() }, crate::OriginalValuesKnown::No, + StateWriteConfig::default(), ) .unwrap(); for i in 0..3 { provider_rw.insert_block(&data.blocks[i].0).unwrap(); - provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap(); + provider_rw + .write_state( + &data.blocks[i].1, + crate::OriginalValuesKnown::No, + StateWriteConfig::default(), + ) + .unwrap(); } provider_rw.commit().unwrap(); @@ -3673,11 +3859,18 @@ mod tests { .write_state( &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() }, crate::OriginalValuesKnown::No, + StateWriteConfig::default(), ) .unwrap(); for i in 0..3 { provider_rw.insert_block(&data.blocks[i].0).unwrap(); - provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap(); + provider_rw + .write_state( + &data.blocks[i].1, + crate::OriginalValuesKnown::No, + StateWriteConfig::default(), + ) + .unwrap(); } provider_rw.commit().unwrap(); @@ -4991,7 +5184,9 @@ mod tests { }]], ..Default::default() }; - provider_rw.write_state(&outcome, crate::OriginalValuesKnown::No).unwrap(); + provider_rw + .write_state(&outcome, crate::OriginalValuesKnown::No, StateWriteConfig::default()) + .unwrap(); provider_rw.commit().unwrap(); }; diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 2ff34c7d2a..14f112a27b 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -10,7 +10,7 @@ pub use database::*; mod static_file; pub use static_file::{ StaticFileAccess, StaticFileJarProvider, StaticFileProvider, StaticFileProviderBuilder, - StaticFileProviderRW, StaticFileProviderRWRefMut, StaticFileWriter, + StaticFileProviderRW, StaticFileProviderRWRefMut, StaticFileWriteCtx, StaticFileWriter, }; mod state; diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index 59d58ae00e..718283114f 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -14,6 +14,7 @@ use alloy_primitives::{b256, keccak256, Address, BlockHash, BlockNumber, TxHash, use dashmap::DashMap; use notify::{RecommendedWatcher, RecursiveMode, Watcher}; use parking_lot::RwLock; +use reth_chain_state::ExecutedBlock; use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, NamedChain}; use reth_db::{ lockfile::StorageLock, @@ -24,7 +25,7 @@ use reth_db::{ }; use reth_db_api::{ cursor::DbCursorRO, - models::StoredBlockBodyIndices, + models::{AccountBeforeTx, StoredBlockBodyIndices}, table::{Decompress, Table, Value}, tables, transaction::DbTx, @@ -32,7 +33,9 @@ use reth_db_api::{ use reth_ethereum_primitives::{Receipt, TransactionSigned}; use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION}; use reth_node_types::NodePrimitives; -use reth_primitives_traits::{RecoveredBlock, SealedHeader, SignedTransaction}; +use reth_primitives_traits::{ + AlloyBlockHeader as _, BlockBody as _, RecoveredBlock, SealedHeader, SignedTransaction, +}; use reth_stages_types::{PipelineTarget, StageId}; use reth_static_file_types::{ find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileMap, @@ -41,15 +44,16 @@ use reth_static_file_types::{ use reth_storage_api::{ BlockBodyIndicesProvider, ChangeSetReader, DBProvider, StorageSettingsCache, }; -use reth_storage_errors::provider::{ProviderError, ProviderResult}; +use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError}; use std::{ collections::BTreeMap, fmt::Debug, ops::{Deref, Range, RangeBounds, RangeInclusive}, path::{Path, PathBuf}, sync::{atomic::AtomicU64, mpsc, Arc}, + thread, }; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, info, instrument, trace, warn}; /// Alias type for a map that can be queried for block or transaction ranges. It uses `u64` to /// represent either a block or a transaction number end of a static file range. @@ -77,6 +81,25 @@ impl StaticFileAccess { } } +/// Context for static file block writes. +/// +/// Contains target segments and pruning configuration. +#[derive(Debug, Clone, Copy, Default)] +pub struct StaticFileWriteCtx { + /// Whether transaction senders should be written to static files. + pub write_senders: bool, + /// Whether receipts should be written to static files. + pub write_receipts: bool, + /// Whether account changesets should be written to static files. + pub write_account_changesets: bool, + /// The current chain tip block number (for pruning). + pub tip: BlockNumber, + /// The prune mode for receipts, if any. + pub receipts_prune_mode: Option, + /// Whether receipts are prunable (based on storage settings and prune distance). + pub receipts_prunable: bool, +} + /// [`StaticFileProvider`] manages all existing [`StaticFileJarProvider`]. /// /// "Static files" contain immutable chain history data, such as: @@ -504,6 +527,192 @@ impl StaticFileProvider { Ok(()) } + /// Writes headers for all blocks to the static file segment. + #[instrument(level = "debug", target = "providers::db", skip_all)] + fn write_headers( + w: &mut StaticFileProviderRWRefMut<'_, N>, + blocks: &[ExecutedBlock], + ) -> ProviderResult<()> { + for block in blocks { + let b = block.recovered_block(); + w.append_header(b.header(), &b.hash())?; + } + Ok(()) + } + + /// Writes transactions for all blocks to the static file segment. + #[instrument(level = "debug", target = "providers::db", skip_all)] + fn write_transactions( + w: &mut StaticFileProviderRWRefMut<'_, N>, + blocks: &[ExecutedBlock], + tx_nums: &[TxNumber], + ) -> ProviderResult<()> { + for (block, &first_tx) in blocks.iter().zip(tx_nums) { + let b = block.recovered_block(); + w.increment_block(b.number())?; + for (i, tx) in b.body().transactions().iter().enumerate() { + w.append_transaction(first_tx + i as u64, tx)?; + } + } + Ok(()) + } + + /// Writes transaction senders for all blocks to the static file segment. + #[instrument(level = "debug", target = "providers::db", skip_all)] + fn write_transaction_senders( + w: &mut StaticFileProviderRWRefMut<'_, N>, + blocks: &[ExecutedBlock], + tx_nums: &[TxNumber], + ) -> ProviderResult<()> { + for (block, &first_tx) in blocks.iter().zip(tx_nums) { + let b = block.recovered_block(); + w.increment_block(b.number())?; + for (i, sender) in b.senders_iter().enumerate() { + w.append_transaction_sender(first_tx + i as u64, sender)?; + } + } + Ok(()) + } + + /// Writes receipts for all blocks to the static file segment. + #[instrument(level = "debug", target = "providers::db", skip_all)] + fn write_receipts( + w: &mut StaticFileProviderRWRefMut<'_, N>, + blocks: &[ExecutedBlock], + tx_nums: &[TxNumber], + ctx: &StaticFileWriteCtx, + ) -> ProviderResult<()> { + for (block, &first_tx) in blocks.iter().zip(tx_nums) { + let block_number = block.recovered_block().number(); + w.increment_block(block_number)?; + + // skip writing receipts if pruning configuration requires us to. + if ctx.receipts_prunable && + ctx.receipts_prune_mode + .is_some_and(|mode| mode.should_prune(block_number, ctx.tip)) + { + continue + } + + for (i, receipt) in block.execution_outcome().receipts.iter().flatten().enumerate() { + w.append_receipt(first_tx + i as u64, receipt)?; + } + } + Ok(()) + } + + /// Writes account changesets for all blocks to the static file segment. + #[instrument(level = "debug", target = "providers::db", skip_all)] + fn write_account_changesets( + w: &mut StaticFileProviderRWRefMut<'_, N>, + blocks: &[ExecutedBlock], + ) -> ProviderResult<()> { + for block in blocks { + let block_number = block.recovered_block().number(); + let reverts = block.execution_outcome().bundle.reverts.to_plain_state_reverts(); + + for account_block_reverts in reverts.accounts { + let changeset = account_block_reverts + .into_iter() + .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) }) + .collect::>(); + w.append_account_changeset(changeset, block_number)?; + } + } + Ok(()) + } + + /// Spawns a scoped thread that writes to a static file segment using the provided closure. + /// + /// The closure receives a mutable reference to the segment writer. After the closure completes, + /// `sync_all()` is called to flush writes to disk. + fn spawn_segment_writer<'scope, 'env, F>( + &'env self, + scope: &'scope thread::Scope<'scope, 'env>, + segment: StaticFileSegment, + first_block_number: BlockNumber, + f: F, + ) -> thread::ScopedJoinHandle<'scope, ProviderResult<()>> + where + F: FnOnce(&mut StaticFileProviderRWRefMut<'_, N>) -> ProviderResult<()> + Send + 'env, + { + scope.spawn(move || { + let mut w = self.get_writer(first_block_number, segment)?; + f(&mut w)?; + w.sync_all() + }) + } + + /// Writes all static file data for multiple blocks in parallel per-segment. + /// + /// This spawns separate threads for each segment type and each thread calls `sync_all()` on its + /// writer when done. + #[instrument(level = "debug", target = "providers::db", skip_all)] + pub fn write_blocks_data( + &self, + blocks: &[ExecutedBlock], + tx_nums: &[TxNumber], + ctx: StaticFileWriteCtx, + ) -> ProviderResult<()> { + if blocks.is_empty() { + return Ok(()); + } + + let first_block_number = blocks[0].recovered_block().number(); + + thread::scope(|s| { + let h_headers = + self.spawn_segment_writer(s, StaticFileSegment::Headers, first_block_number, |w| { + Self::write_headers(w, blocks) + }); + + let h_txs = self.spawn_segment_writer( + s, + StaticFileSegment::Transactions, + first_block_number, + |w| Self::write_transactions(w, blocks, tx_nums), + ); + + let h_senders = ctx.write_senders.then(|| { + self.spawn_segment_writer( + s, + StaticFileSegment::TransactionSenders, + first_block_number, + |w| Self::write_transaction_senders(w, blocks, tx_nums), + ) + }); + + let h_receipts = ctx.write_receipts.then(|| { + self.spawn_segment_writer(s, StaticFileSegment::Receipts, first_block_number, |w| { + Self::write_receipts(w, blocks, tx_nums, &ctx) + }) + }); + + let h_account_changesets = ctx.write_account_changesets.then(|| { + self.spawn_segment_writer( + s, + StaticFileSegment::AccountChangeSets, + first_block_number, + |w| Self::write_account_changesets(w, blocks), + ) + }); + + h_headers.join().map_err(|_| StaticFileWriterError::ThreadPanic("headers"))??; + h_txs.join().map_err(|_| StaticFileWriterError::ThreadPanic("transactions"))??; + if let Some(h) = h_senders { + h.join().map_err(|_| StaticFileWriterError::ThreadPanic("senders"))??; + } + if let Some(h) = h_receipts { + h.join().map_err(|_| StaticFileWriterError::ThreadPanic("receipts"))??; + } + if let Some(h) = h_account_changesets { + h.join() + .map_err(|_| StaticFileWriterError::ThreadPanic("account_changesets"))??; + } + Ok(()) + }) + } + /// Gets the [`StaticFileJarProvider`] of the requested segment and start index that can be /// either block or transaction. pub fn get_segment_provider( diff --git a/crates/storage/provider/src/providers/static_file/mod.rs b/crates/storage/provider/src/providers/static_file/mod.rs index a20dd6a3ff..aa5b61171a 100644 --- a/crates/storage/provider/src/providers/static_file/mod.rs +++ b/crates/storage/provider/src/providers/static_file/mod.rs @@ -1,6 +1,7 @@ mod manager; pub use manager::{ - StaticFileAccess, StaticFileProvider, StaticFileProviderBuilder, StaticFileWriter, + StaticFileAccess, StaticFileProvider, StaticFileProviderBuilder, StaticFileWriteCtx, + StaticFileWriter, }; mod jar; diff --git a/crates/storage/provider/src/providers/static_file/writer.rs b/crates/storage/provider/src/providers/static_file/writer.rs index 1d893e4291..869554cc79 100644 --- a/crates/storage/provider/src/providers/static_file/writer.rs +++ b/crates/storage/provider/src/providers/static_file/writer.rs @@ -206,6 +206,8 @@ pub struct StaticFileProviderRW { metrics: Option>, /// On commit, contains the pruning strategy to apply for the segment. prune_on_commit: Option, + /// Whether `sync_all()` has been called. Used by `finalize()` to avoid redundant syncs. + synced: bool, } impl StaticFileProviderRW { @@ -227,6 +229,7 @@ impl StaticFileProviderRW { reader, metrics, prune_on_commit: None, + synced: false, }; writer.ensure_end_range_consistency()?; @@ -335,12 +338,13 @@ impl StaticFileProviderRW { if self.writer.is_dirty() { self.writer.sync_all().map_err(ProviderError::other)?; } + self.synced = true; Ok(()) } /// Commits configuration to disk and updates the reader index. /// - /// Must be called after [`Self::sync_all`] to complete the commit. + /// If `sync_all()` was not called, this will call it first to ensure data is persisted. /// /// Returns an error if prune is queued (use [`Self::commit`] instead). pub fn finalize(&mut self) -> ProviderResult<()> { @@ -348,9 +352,14 @@ impl StaticFileProviderRW { return Err(StaticFileWriterError::FinalizeWithPruneQueued.into()); } if self.writer.is_dirty() { + if !self.synced { + self.writer.sync_all().map_err(ProviderError::other)?; + } + self.writer.finalize().map_err(ProviderError::other)?; self.update_index()?; } + self.synced = false; Ok(()) } diff --git a/crates/storage/provider/src/writer/mod.rs b/crates/storage/provider/src/writer/mod.rs index 0c67634dbf..c361bfc7af 100644 --- a/crates/storage/provider/src/writer/mod.rs +++ b/crates/storage/provider/src/writer/mod.rs @@ -13,7 +13,9 @@ mod tests { use reth_ethereum_primitives::Receipt; use reth_execution_types::ExecutionOutcome; use reth_primitives_traits::{Account, StorageEntry}; - use reth_storage_api::{DatabaseProviderFactory, HashedPostStateProvider, StateWriter}; + use reth_storage_api::{ + DatabaseProviderFactory, HashedPostStateProvider, StateWriteConfig, StateWriter, + }; use reth_trie::{ test_utils::{state_root, storage_root_prehashed}, HashedPostState, HashedStorage, StateRoot, StorageRoot, StorageRootProgress, @@ -135,7 +137,7 @@ mod tests { provider.write_state_changes(plain_state).expect("Could not write plain state to DB"); assert_eq!(reverts.storage, [[]]); - provider.write_state_reverts(reverts, 1).expect("Could not write reverts to DB"); + provider.write_state_reverts(reverts, 1, StateWriteConfig::default()).expect("Could not write reverts to DB"); let reth_account_a = account_a.into(); let reth_account_b = account_b.into(); @@ -201,7 +203,7 @@ mod tests { reverts.storage, [[PlainStorageRevert { address: address_b, wiped: true, storage_revert: vec![] }]] ); - provider.write_state_reverts(reverts, 2).expect("Could not write reverts to DB"); + provider.write_state_reverts(reverts, 2, StateWriteConfig::default()).expect("Could not write reverts to DB"); // Check new plain state for account B assert_eq!( @@ -280,7 +282,7 @@ mod tests { let outcome = ExecutionOutcome::new(state.take_bundle(), Default::default(), 1, Vec::new()); provider - .write_state(&outcome, OriginalValuesKnown::Yes) + .write_state(&outcome, OriginalValuesKnown::Yes, StateWriteConfig::default()) .expect("Could not write bundle state to DB"); // Check plain storage state @@ -380,7 +382,7 @@ mod tests { state.merge_transitions(BundleRetention::Reverts); let outcome = ExecutionOutcome::new(state.take_bundle(), Default::default(), 2, Vec::new()); provider - .write_state(&outcome, OriginalValuesKnown::Yes) + .write_state(&outcome, OriginalValuesKnown::Yes, StateWriteConfig::default()) .expect("Could not write bundle state to DB"); assert_eq!( @@ -448,7 +450,7 @@ mod tests { let outcome = ExecutionOutcome::new(init_state.take_bundle(), Default::default(), 0, Vec::new()); provider - .write_state(&outcome, OriginalValuesKnown::Yes) + .write_state(&outcome, OriginalValuesKnown::Yes, StateWriteConfig::default()) .expect("Could not write bundle state to DB"); let mut state = State::builder().with_bundle_update().build(); @@ -607,7 +609,7 @@ mod tests { let outcome: ExecutionOutcome = ExecutionOutcome::new(bundle, Default::default(), 1, Vec::new()); provider - .write_state(&outcome, OriginalValuesKnown::Yes) + .write_state(&outcome, OriginalValuesKnown::Yes, StateWriteConfig::default()) .expect("Could not write bundle state to DB"); let mut storage_changeset_cursor = provider @@ -773,7 +775,7 @@ mod tests { let outcome = ExecutionOutcome::new(init_state.take_bundle(), Default::default(), 0, Vec::new()); provider - .write_state(&outcome, OriginalValuesKnown::Yes) + .write_state(&outcome, OriginalValuesKnown::Yes, StateWriteConfig::default()) .expect("Could not write bundle state to DB"); let mut state = State::builder().with_bundle_update().build(); @@ -822,7 +824,7 @@ mod tests { state.merge_transitions(BundleRetention::Reverts); let outcome = ExecutionOutcome::new(state.take_bundle(), Default::default(), 1, Vec::new()); provider - .write_state(&outcome, OriginalValuesKnown::Yes) + .write_state(&outcome, OriginalValuesKnown::Yes, StateWriteConfig::default()) .expect("Could not write bundle state to DB"); let mut storage_changeset_cursor = provider diff --git a/crates/storage/storage-api/src/state_writer.rs b/crates/storage/storage-api/src/state_writer.rs index 711b9e569f..3daab1a85a 100644 --- a/crates/storage/storage-api/src/state_writer.rs +++ b/crates/storage/storage-api/src/state_writer.rs @@ -12,21 +12,26 @@ pub trait StateWriter { /// Receipt type included into [`ExecutionOutcome`]. type Receipt; - /// Write the state and receipts to the database or static files if `static_file_producer` is - /// `Some`. It should be `None` if there is any kind of pruning/filtering over the receipts. + /// Write the state and optionally receipts to the database. + /// + /// Use `config` to skip writing certain data types when they are written elsewhere. fn write_state( &self, execution_outcome: &ExecutionOutcome, is_value_known: OriginalValuesKnown, + config: StateWriteConfig, ) -> ProviderResult<()>; /// Write state reverts to the database. /// /// NOTE: Reverts will delete all wiped storage from plain state. + /// + /// Use `config` to skip writing certain data types when they are written elsewhere. fn write_state_reverts( &self, reverts: PlainStateReverts, first_block: BlockNumber, + config: StateWriteConfig, ) -> ProviderResult<()>; /// Write state changes to the database. @@ -46,3 +51,20 @@ pub trait StateWriter { block: BlockNumber, ) -> ProviderResult>; } + +/// Configuration for what to write when calling [`StateWriter::write_state`]. +/// +/// Used to skip writing certain data types, when they are being written separately. +#[derive(Debug, Clone, Copy)] +pub struct StateWriteConfig { + /// Whether to write receipts. + pub write_receipts: bool, + /// Whether to write account changesets. + pub write_account_changesets: bool, +} + +impl Default for StateWriteConfig { + fn default() -> Self { + Self { write_receipts: true, write_account_changesets: true } + } +} diff --git a/testing/ef-tests/src/cases/blockchain_test.rs b/testing/ef-tests/src/cases/blockchain_test.rs index 1ecbe9a3b1..6d8dbc6827 100644 --- a/testing/ef-tests/src/cases/blockchain_test.rs +++ b/testing/ef-tests/src/cases/blockchain_test.rs @@ -17,7 +17,7 @@ use reth_primitives_traits::{Block as BlockTrait, RecoveredBlock, SealedBlock}; use reth_provider::{ test_utils::create_test_provider_factory_with_chain_spec, BlockWriter, DatabaseProviderFactory, ExecutionOutcome, HeaderProvider, HistoryWriter, OriginalValuesKnown, StateProofProvider, - StateWriter, StaticFileProviderFactory, StaticFileSegment, StaticFileWriter, + StateWriteConfig, StateWriter, StaticFileProviderFactory, StaticFileSegment, StaticFileWriter, }; use reth_revm::{database::StateProviderDatabase, witness::ExecutionWitnessRecord, State}; use reth_stateless::{ @@ -325,7 +325,11 @@ fn run_case( // Commit the post state/state diff to the database provider - .write_state(&ExecutionOutcome::single(block.number, output), OriginalValuesKnown::Yes) + .write_state( + &ExecutionOutcome::single(block.number, output), + OriginalValuesKnown::Yes, + StateWriteConfig::default(), + ) .map_err(|err| Error::block_failed(block_number, program_inputs.clone(), err))?; provider