mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
feat: parallelize save_blocks (#20993)
Co-authored-by: Sergei Shulepov <s.pepyakin@gmail.com> Co-authored-by: Sergei Shulepov <pep@tempo.xyz> Co-authored-by: Brian Picciano <me@mediocregopher.com>
This commit is contained in:
@@ -7,7 +7,7 @@ use reth_ethereum_primitives::EthPrimitives;
|
||||
use reth_primitives_traits::NodePrimitives;
|
||||
use reth_provider::{
|
||||
providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter,
|
||||
DBProvider, DatabaseProviderFactory, ProviderFactory,
|
||||
DBProvider, DatabaseProviderFactory, ProviderFactory, SaveBlocksMode,
|
||||
};
|
||||
use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
|
||||
use reth_stages_api::{MetricEvent, MetricEventsSender};
|
||||
@@ -151,7 +151,7 @@ where
|
||||
if last_block.is_some() {
|
||||
let provider_rw = self.provider.database_provider_rw()?;
|
||||
|
||||
provider_rw.save_blocks(blocks)?;
|
||||
provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?;
|
||||
provider_rw.commit()?;
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ use reth_optimism_primitives::{bedrock::is_dup_tx, OpPrimitives, OpReceipt};
|
||||
use reth_primitives_traits::NodePrimitives;
|
||||
use reth_provider::{
|
||||
providers::ProviderNodeTypes, DBProvider, DatabaseProviderFactory, OriginalValuesKnown,
|
||||
ProviderFactory, StageCheckpointReader, StageCheckpointWriter, StateWriter,
|
||||
ProviderFactory, StageCheckpointReader, StageCheckpointWriter, StateWriteConfig, StateWriter,
|
||||
StaticFileProviderFactory, StatsReader,
|
||||
};
|
||||
use reth_stages::{StageCheckpoint, StageId};
|
||||
@@ -228,7 +228,11 @@ where
|
||||
ExecutionOutcome::new(Default::default(), receipts, first_block, Default::default());
|
||||
|
||||
// finally, write the receipts
|
||||
provider.write_state(&execution_outcome, OriginalValuesKnown::Yes)?;
|
||||
provider.write_state(
|
||||
&execution_outcome,
|
||||
OriginalValuesKnown::Yes,
|
||||
StateWriteConfig::default(),
|
||||
)?;
|
||||
}
|
||||
|
||||
// Only commit if we have imported as many receipts as the number of transactions.
|
||||
|
||||
@@ -12,7 +12,7 @@ use reth_primitives_traits::{format_gas_throughput, BlockBody, NodePrimitives};
|
||||
use reth_provider::{
|
||||
providers::{StaticFileProvider, StaticFileWriter},
|
||||
BlockHashReader, BlockReader, DBProvider, EitherWriter, ExecutionOutcome, HeaderProvider,
|
||||
LatestStateProviderRef, OriginalValuesKnown, ProviderError, StateWriter,
|
||||
LatestStateProviderRef, OriginalValuesKnown, ProviderError, StateWriteConfig, StateWriter,
|
||||
StaticFileProviderFactory, StatsReader, StorageSettingsCache, TransactionVariant,
|
||||
};
|
||||
use reth_revm::database::StateProviderDatabase;
|
||||
@@ -463,7 +463,7 @@ where
|
||||
}
|
||||
|
||||
// write output
|
||||
provider.write_state(&state, OriginalValuesKnown::Yes)?;
|
||||
provider.write_state(&state, OriginalValuesKnown::Yes, StateWriteConfig::default())?;
|
||||
|
||||
let db_write_duration = time.elapsed();
|
||||
debug!(
|
||||
|
||||
@@ -16,8 +16,8 @@ use reth_provider::{
|
||||
errors::provider::ProviderResult, providers::StaticFileWriter, BlockHashReader, BlockNumReader,
|
||||
BundleStateInit, ChainSpecProvider, DBProvider, DatabaseProviderFactory, ExecutionOutcome,
|
||||
HashingWriter, HeaderProvider, HistoryWriter, MetadataWriter, OriginalValuesKnown,
|
||||
ProviderError, RevertsInit, StageCheckpointReader, StageCheckpointWriter, StateWriter,
|
||||
StaticFileProviderFactory, StorageSettings, StorageSettingsCache, TrieWriter,
|
||||
ProviderError, RevertsInit, StageCheckpointReader, StageCheckpointWriter, StateWriteConfig,
|
||||
StateWriter, StaticFileProviderFactory, StorageSettings, StorageSettingsCache, TrieWriter,
|
||||
};
|
||||
use reth_stages_types::{StageCheckpoint, StageId};
|
||||
use reth_static_file_types::StaticFileSegment;
|
||||
@@ -334,7 +334,11 @@ where
|
||||
Vec::new(),
|
||||
);
|
||||
|
||||
provider.write_state(&execution_outcome, OriginalValuesKnown::Yes)?;
|
||||
provider.write_state(
|
||||
&execution_outcome,
|
||||
OriginalValuesKnown::Yes,
|
||||
StateWriteConfig::default(),
|
||||
)?;
|
||||
|
||||
trace!(target: "reth::cli", "Inserted state");
|
||||
|
||||
|
||||
@@ -225,6 +225,9 @@ pub enum StaticFileWriterError {
|
||||
/// Cannot call `sync_all` or `finalize` when prune is queued.
|
||||
#[error("cannot call sync_all or finalize when prune is queued, use commit() instead")]
|
||||
FinalizeWithPruneQueued,
|
||||
/// Thread panicked during execution.
|
||||
#[error("thread panicked: {_0}")]
|
||||
ThreadPanic(&'static str),
|
||||
/// Other error with message.
|
||||
#[error("{0}")]
|
||||
Other(String),
|
||||
|
||||
@@ -58,6 +58,9 @@ impl TxnManager {
|
||||
match rx.recv() {
|
||||
Ok(msg) => match msg {
|
||||
TxnManagerMessage::Begin { parent, flags, sender } => {
|
||||
let _span =
|
||||
tracing::debug_span!(target: "libmdbx::txn", "begin", flags)
|
||||
.entered();
|
||||
let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
|
||||
let res = mdbx_result(unsafe {
|
||||
ffi::mdbx_txn_begin_ex(
|
||||
@@ -72,9 +75,13 @@ impl TxnManager {
|
||||
sender.send(res).unwrap();
|
||||
}
|
||||
TxnManagerMessage::Abort { tx, sender } => {
|
||||
let _span =
|
||||
tracing::debug_span!(target: "libmdbx::txn", "abort").entered();
|
||||
sender.send(mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) })).unwrap();
|
||||
}
|
||||
TxnManagerMessage::Commit { tx, sender } => {
|
||||
let _span =
|
||||
tracing::debug_span!(target: "libmdbx::txn", "commit").entered();
|
||||
sender
|
||||
.send({
|
||||
let mut latency = CommitLatency::new();
|
||||
|
||||
@@ -429,6 +429,41 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Puts multiple transaction hash number mappings in a batch.
|
||||
///
|
||||
/// Accepts a vector of `(TxHash, TxNumber)` tuples and writes them all using the same cursor.
|
||||
/// This is more efficient than calling `put_transaction_hash_number` repeatedly.
|
||||
///
|
||||
/// When `append_only` is true, uses `cursor.append()` which requires entries to be
|
||||
/// pre-sorted and the table to be empty or have only lower keys.
|
||||
/// When false, uses `cursor.upsert()` which handles arbitrary insertion order.
|
||||
pub fn put_transaction_hash_numbers_batch(
|
||||
&mut self,
|
||||
entries: Vec<(TxHash, TxNumber)>,
|
||||
append_only: bool,
|
||||
) -> ProviderResult<()> {
|
||||
match self {
|
||||
Self::Database(cursor) => {
|
||||
for (hash, tx_num) in entries {
|
||||
if append_only {
|
||||
cursor.append(hash, &tx_num)?;
|
||||
} else {
|
||||
cursor.upsert(hash, &tx_num)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(batch) => {
|
||||
for (hash, tx_num) in entries {
|
||||
batch.put::<tables::TransactionHashNumbers>(hash, &tx_num)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Deletes a transaction hash number mapping.
|
||||
pub fn delete_transaction_hash_number(&mut self, hash: TxHash) -> ProviderResult<()> {
|
||||
match self {
|
||||
|
||||
@@ -21,7 +21,8 @@ pub mod providers;
|
||||
pub use providers::{
|
||||
DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, HistoricalStateProvider,
|
||||
HistoricalStateProviderRef, LatestStateProvider, LatestStateProviderRef, ProviderFactory,
|
||||
StaticFileAccess, StaticFileProviderBuilder, StaticFileWriter,
|
||||
SaveBlocksMode, StaticFileAccess, StaticFileProviderBuilder, StaticFileWriteCtx,
|
||||
StaticFileWriter,
|
||||
};
|
||||
|
||||
pub mod changeset_walker;
|
||||
@@ -44,8 +45,8 @@ pub use revm_database::states::OriginalValuesKnown;
|
||||
// reexport traits to avoid breaking changes
|
||||
pub use reth_static_file_types as static_file;
|
||||
pub use reth_storage_api::{
|
||||
HistoryWriter, MetadataProvider, MetadataWriter, StatsReader, StorageSettings,
|
||||
StorageSettingsCache,
|
||||
HistoryWriter, MetadataProvider, MetadataWriter, StateWriteConfig, StatsReader,
|
||||
StorageSettings, StorageSettingsCache,
|
||||
};
|
||||
/// Re-export provider error.
|
||||
pub use reth_storage_errors::provider::{ProviderError, ProviderResult};
|
||||
|
||||
@@ -789,7 +789,7 @@ mod tests {
|
||||
create_test_provider_factory, create_test_provider_factory_with_chain_spec,
|
||||
MockNodeTypesWithDB,
|
||||
},
|
||||
BlockWriter, CanonChainTracker, ProviderFactory,
|
||||
BlockWriter, CanonChainTracker, ProviderFactory, SaveBlocksMode,
|
||||
};
|
||||
use alloy_eips::{BlockHashOrNumber, BlockNumHash, BlockNumberOrTag};
|
||||
use alloy_primitives::{BlockNumber, TxNumber, B256};
|
||||
@@ -808,8 +808,8 @@ mod tests {
|
||||
use reth_storage_api::{
|
||||
BlockBodyIndicesProvider, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader,
|
||||
BlockReaderIdExt, BlockSource, ChangeSetReader, DBProvider, DatabaseProviderFactory,
|
||||
HeaderProvider, ReceiptProvider, ReceiptProviderIdExt, StateProviderFactory, StateWriter,
|
||||
TransactionVariant, TransactionsProvider,
|
||||
HeaderProvider, ReceiptProvider, ReceiptProviderIdExt, StateProviderFactory,
|
||||
StateWriteConfig, StateWriter, TransactionVariant, TransactionsProvider,
|
||||
};
|
||||
use reth_testing_utils::generators::{
|
||||
self, random_block, random_block_range, random_changeset_range, random_eoa_accounts,
|
||||
@@ -907,6 +907,7 @@ mod tests {
|
||||
..Default::default()
|
||||
},
|
||||
OriginalValuesKnown::No,
|
||||
StateWriteConfig::default(),
|
||||
)?;
|
||||
}
|
||||
|
||||
@@ -997,7 +998,7 @@ mod tests {
|
||||
|
||||
// Push to disk
|
||||
let provider_rw = hook_provider.database_provider_rw().unwrap();
|
||||
provider_rw.save_blocks(vec![lowest_memory_block]).unwrap();
|
||||
provider_rw.save_blocks(vec![lowest_memory_block], SaveBlocksMode::Full).unwrap();
|
||||
provider_rw.commit().unwrap();
|
||||
|
||||
// Remove from memory
|
||||
|
||||
@@ -40,16 +40,8 @@ pub(crate) enum Action {
|
||||
InsertHeaderNumbers,
|
||||
InsertBlockBodyIndices,
|
||||
InsertTransactionBlocks,
|
||||
GetNextTxNum,
|
||||
InsertTransactionSenders,
|
||||
InsertTransactionHashNumbers,
|
||||
SaveBlocksInsertBlock,
|
||||
SaveBlocksWriteState,
|
||||
SaveBlocksWriteHashedState,
|
||||
SaveBlocksWriteTrieChangesets,
|
||||
SaveBlocksWriteTrieUpdates,
|
||||
SaveBlocksUpdateHistoryIndices,
|
||||
SaveBlocksUpdatePipelineStages,
|
||||
}
|
||||
|
||||
/// Database provider metrics
|
||||
@@ -66,19 +58,24 @@ pub(crate) struct DatabaseProviderMetrics {
|
||||
insert_history_indices: Histogram,
|
||||
/// Duration of update pipeline stages
|
||||
update_pipeline_stages: Histogram,
|
||||
/// Duration of insert canonical headers
|
||||
/// Duration of insert header numbers
|
||||
insert_header_numbers: Histogram,
|
||||
/// Duration of insert block body indices
|
||||
insert_block_body_indices: Histogram,
|
||||
/// Duration of insert transaction blocks
|
||||
insert_tx_blocks: Histogram,
|
||||
/// Duration of get next tx num
|
||||
get_next_tx_num: Histogram,
|
||||
/// Duration of insert transaction senders
|
||||
insert_transaction_senders: Histogram,
|
||||
/// Duration of insert transaction hash numbers
|
||||
insert_transaction_hash_numbers: Histogram,
|
||||
/// Duration of `save_blocks`
|
||||
save_blocks_total: Histogram,
|
||||
/// Duration of MDBX work in `save_blocks`
|
||||
save_blocks_mdbx: Histogram,
|
||||
/// Duration of static file work in `save_blocks`
|
||||
save_blocks_sf: Histogram,
|
||||
/// Duration of `RocksDB` work in `save_blocks`
|
||||
save_blocks_rocksdb: Histogram,
|
||||
/// Duration of `insert_block` in `save_blocks`
|
||||
save_blocks_insert_block: Histogram,
|
||||
/// Duration of `write_state` in `save_blocks`
|
||||
@@ -93,6 +90,39 @@ pub(crate) struct DatabaseProviderMetrics {
|
||||
save_blocks_update_history_indices: Histogram,
|
||||
/// Duration of `update_pipeline_stages` in `save_blocks`
|
||||
save_blocks_update_pipeline_stages: Histogram,
|
||||
/// Number of blocks per `save_blocks` call
|
||||
save_blocks_block_count: Histogram,
|
||||
/// Duration of MDBX commit in `save_blocks`
|
||||
save_blocks_commit_mdbx: Histogram,
|
||||
/// Duration of static file commit in `save_blocks`
|
||||
save_blocks_commit_sf: Histogram,
|
||||
/// Duration of `RocksDB` commit in `save_blocks`
|
||||
save_blocks_commit_rocksdb: Histogram,
|
||||
}
|
||||
|
||||
/// Timings collected during a `save_blocks` call.
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct SaveBlocksTimings {
|
||||
pub total: Duration,
|
||||
pub mdbx: Duration,
|
||||
pub sf: Duration,
|
||||
pub rocksdb: Duration,
|
||||
pub insert_block: Duration,
|
||||
pub write_state: Duration,
|
||||
pub write_hashed_state: Duration,
|
||||
pub write_trie_changesets: Duration,
|
||||
pub write_trie_updates: Duration,
|
||||
pub update_history_indices: Duration,
|
||||
pub update_pipeline_stages: Duration,
|
||||
pub block_count: u64,
|
||||
}
|
||||
|
||||
/// Timings collected during a `commit` call.
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct CommitTimings {
|
||||
pub mdbx: Duration,
|
||||
pub sf: Duration,
|
||||
pub rocksdb: Duration,
|
||||
}
|
||||
|
||||
impl DatabaseProviderMetrics {
|
||||
@@ -107,28 +137,33 @@ impl DatabaseProviderMetrics {
|
||||
Action::InsertHeaderNumbers => self.insert_header_numbers.record(duration),
|
||||
Action::InsertBlockBodyIndices => self.insert_block_body_indices.record(duration),
|
||||
Action::InsertTransactionBlocks => self.insert_tx_blocks.record(duration),
|
||||
Action::GetNextTxNum => self.get_next_tx_num.record(duration),
|
||||
Action::InsertTransactionSenders => self.insert_transaction_senders.record(duration),
|
||||
Action::InsertTransactionHashNumbers => {
|
||||
self.insert_transaction_hash_numbers.record(duration)
|
||||
}
|
||||
Action::SaveBlocksInsertBlock => self.save_blocks_insert_block.record(duration),
|
||||
Action::SaveBlocksWriteState => self.save_blocks_write_state.record(duration),
|
||||
Action::SaveBlocksWriteHashedState => {
|
||||
self.save_blocks_write_hashed_state.record(duration)
|
||||
}
|
||||
Action::SaveBlocksWriteTrieChangesets => {
|
||||
self.save_blocks_write_trie_changesets.record(duration)
|
||||
}
|
||||
Action::SaveBlocksWriteTrieUpdates => {
|
||||
self.save_blocks_write_trie_updates.record(duration)
|
||||
}
|
||||
Action::SaveBlocksUpdateHistoryIndices => {
|
||||
self.save_blocks_update_history_indices.record(duration)
|
||||
}
|
||||
Action::SaveBlocksUpdatePipelineStages => {
|
||||
self.save_blocks_update_pipeline_stages.record(duration)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Records all `save_blocks` timings.
|
||||
pub(crate) fn record_save_blocks(&self, timings: &SaveBlocksTimings) {
|
||||
self.save_blocks_total.record(timings.total);
|
||||
self.save_blocks_mdbx.record(timings.mdbx);
|
||||
self.save_blocks_sf.record(timings.sf);
|
||||
self.save_blocks_rocksdb.record(timings.rocksdb);
|
||||
self.save_blocks_insert_block.record(timings.insert_block);
|
||||
self.save_blocks_write_state.record(timings.write_state);
|
||||
self.save_blocks_write_hashed_state.record(timings.write_hashed_state);
|
||||
self.save_blocks_write_trie_changesets.record(timings.write_trie_changesets);
|
||||
self.save_blocks_write_trie_updates.record(timings.write_trie_updates);
|
||||
self.save_blocks_update_history_indices.record(timings.update_history_indices);
|
||||
self.save_blocks_update_pipeline_stages.record(timings.update_pipeline_stages);
|
||||
self.save_blocks_block_count.record(timings.block_count as f64);
|
||||
}
|
||||
|
||||
/// Records all commit timings.
|
||||
pub(crate) fn record_commit(&self, timings: &CommitTimings) {
|
||||
self.save_blocks_commit_mdbx.record(timings.mdbx);
|
||||
self.save_blocks_commit_sf.record(timings.sf);
|
||||
self.save_blocks_commit_rocksdb.record(timings.rocksdb);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,7 +43,7 @@ use std::{
|
||||
use tracing::trace;
|
||||
|
||||
mod provider;
|
||||
pub use provider::{DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW};
|
||||
pub use provider::{DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode};
|
||||
|
||||
use super::ProviderNodeTypes;
|
||||
use reth_trie::KeccakKeyHasher;
|
||||
|
||||
@@ -5,7 +5,7 @@ use crate::{
|
||||
providers::{
|
||||
database::{chain::ChainStorage, metrics},
|
||||
rocksdb::RocksDBProvider,
|
||||
static_file::StaticFileWriter,
|
||||
static_file::{StaticFileWriteCtx, StaticFileWriter},
|
||||
NodeTypesForProvider, StaticFileProvider,
|
||||
},
|
||||
to_range,
|
||||
@@ -35,7 +35,7 @@ use alloy_primitives::{
|
||||
use itertools::Itertools;
|
||||
use parking_lot::RwLock;
|
||||
use rayon::slice::ParallelSliceMut;
|
||||
use reth_chain_state::ExecutedBlock;
|
||||
use reth_chain_state::{ComputedTrieData, ExecutedBlock};
|
||||
use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
|
||||
use reth_db_api::{
|
||||
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
|
||||
@@ -61,10 +61,10 @@ use reth_stages_types::{StageCheckpoint, StageId};
|
||||
use reth_static_file_types::StaticFileSegment;
|
||||
use reth_storage_api::{
|
||||
BlockBodyIndicesProvider, BlockBodyReader, MetadataProvider, MetadataWriter,
|
||||
NodePrimitivesProvider, StateProvider, StorageChangeSetReader, StorageSettingsCache,
|
||||
TryIntoHistoricalStateProvider,
|
||||
NodePrimitivesProvider, StateProvider, StateWriteConfig, StorageChangeSetReader,
|
||||
StorageSettingsCache, TryIntoHistoricalStateProvider,
|
||||
};
|
||||
use reth_storage_errors::provider::ProviderResult;
|
||||
use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError};
|
||||
use reth_trie::{
|
||||
trie_cursor::{
|
||||
InMemoryTrieCursor, InMemoryTrieCursorFactory, TrieCursor, TrieCursorFactory,
|
||||
@@ -85,9 +85,10 @@ use std::{
|
||||
fmt::Debug,
|
||||
ops::{Deref, DerefMut, Range, RangeBounds, RangeFrom, RangeInclusive},
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
thread,
|
||||
time::Instant,
|
||||
};
|
||||
use tracing::{debug, trace};
|
||||
use tracing::{debug, instrument, trace};
|
||||
|
||||
/// A [`DatabaseProvider`] that holds a read-only database transaction.
|
||||
pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
|
||||
@@ -150,6 +151,25 @@ impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
|
||||
}
|
||||
}
|
||||
|
||||
/// Mode for [`DatabaseProvider::save_blocks`].
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum SaveBlocksMode {
|
||||
/// Full mode: write block structure + receipts + state + trie.
|
||||
/// Used by engine/production code.
|
||||
Full,
|
||||
/// Blocks only: write block structure (headers, txs, senders, indices).
|
||||
/// Receipts/state/trie are skipped - they may come later via separate calls.
|
||||
/// Used by `insert_block`.
|
||||
BlocksOnly,
|
||||
}
|
||||
|
||||
impl SaveBlocksMode {
|
||||
/// Returns `true` if this is [`SaveBlocksMode::Full`].
|
||||
pub const fn with_state(self) -> bool {
|
||||
matches!(self, Self::Full)
|
||||
}
|
||||
}
|
||||
|
||||
/// A provider struct that fetches data from the database.
|
||||
/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
|
||||
pub struct DatabaseProvider<TX, N: NodeTypes> {
|
||||
@@ -356,98 +376,257 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Creates the context for static file writes.
|
||||
fn static_file_write_ctx(
|
||||
&self,
|
||||
save_mode: SaveBlocksMode,
|
||||
first_block: BlockNumber,
|
||||
last_block: BlockNumber,
|
||||
) -> ProviderResult<StaticFileWriteCtx> {
|
||||
let tip = self.last_block_number()?.max(last_block);
|
||||
Ok(StaticFileWriteCtx {
|
||||
write_senders: EitherWriterDestination::senders(self).is_static_file() &&
|
||||
self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()),
|
||||
write_receipts: save_mode.with_state() &&
|
||||
EitherWriter::receipts_destination(self).is_static_file(),
|
||||
write_account_changesets: save_mode.with_state() &&
|
||||
EitherWriterDestination::account_changesets(self).is_static_file(),
|
||||
tip,
|
||||
receipts_prune_mode: self.prune_modes.receipts,
|
||||
// Receipts are prunable if no receipts exist in SF yet and within pruning distance
|
||||
receipts_prunable: self
|
||||
.static_file_provider
|
||||
.get_highest_static_file_tx(StaticFileSegment::Receipts)
|
||||
.is_none() &&
|
||||
PruneMode::Distance(self.minimum_pruning_distance)
|
||||
.should_prune(first_block, tip),
|
||||
})
|
||||
}
|
||||
|
||||
/// Writes executed blocks and state to storage.
|
||||
pub fn save_blocks(&self, blocks: Vec<ExecutedBlock<N::Primitives>>) -> ProviderResult<()> {
|
||||
///
|
||||
/// This method parallelizes static file (SF) writes with MDBX writes.
|
||||
/// The SF thread writes headers, transactions, senders (if SF), and receipts (if SF, Full mode
|
||||
/// only). The main thread writes MDBX data (indices, state, trie - Full mode only).
|
||||
///
|
||||
/// Use [`SaveBlocksMode::Full`] for production (includes receipts, state, trie).
|
||||
/// Use [`SaveBlocksMode::BlocksOnly`] for block structure only (used by `insert_block`).
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all, fields(block_count = blocks.len()))]
|
||||
pub fn save_blocks(
|
||||
&self,
|
||||
blocks: Vec<ExecutedBlock<N::Primitives>>,
|
||||
save_mode: SaveBlocksMode,
|
||||
) -> ProviderResult<()> {
|
||||
if blocks.is_empty() {
|
||||
debug!(target: "providers::db", "Attempted to write empty block range");
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
// NOTE: checked non-empty above
|
||||
let first_block = blocks.first().unwrap().recovered_block();
|
||||
let total_start = Instant::now();
|
||||
let block_count = blocks.len() as u64;
|
||||
let first_number = blocks.first().unwrap().recovered_block().number();
|
||||
let last_block_number = blocks.last().unwrap().recovered_block().number();
|
||||
|
||||
let last_block = blocks.last().unwrap().recovered_block();
|
||||
let first_number = first_block.number();
|
||||
let last_block_number = last_block.number();
|
||||
debug!(target: "providers::db", block_count, "Writing blocks and execution data to storage");
|
||||
|
||||
debug!(target: "providers::db", block_count = %blocks.len(), "Writing blocks and execution data to storage");
|
||||
// Compute tx_nums upfront (both threads need these)
|
||||
let first_tx_num = self
|
||||
.tx
|
||||
.cursor_read::<tables::TransactionBlocks>()?
|
||||
.last()?
|
||||
.map(|(n, _)| n + 1)
|
||||
.unwrap_or_default();
|
||||
|
||||
// Accumulate durations for each step
|
||||
let mut total_insert_block = Duration::ZERO;
|
||||
let mut total_write_state = Duration::ZERO;
|
||||
let mut total_write_hashed_state = Duration::ZERO;
|
||||
let mut total_write_trie_changesets = Duration::ZERO;
|
||||
let mut total_write_trie_updates = Duration::ZERO;
|
||||
let tx_nums: Vec<TxNumber> = {
|
||||
let mut nums = Vec::with_capacity(blocks.len());
|
||||
let mut current = first_tx_num;
|
||||
for block in &blocks {
|
||||
nums.push(current);
|
||||
current += block.recovered_block().body().transaction_count() as u64;
|
||||
}
|
||||
nums
|
||||
};
|
||||
|
||||
// TODO: Do performant / batched writes for each type of object
|
||||
// instead of a loop over all blocks,
|
||||
// meaning:
|
||||
// * blocks
|
||||
// * state
|
||||
// * hashed state
|
||||
// * trie updates (cannot naively extend, need helper)
|
||||
// * indices (already done basically)
|
||||
// Insert the blocks
|
||||
for block in blocks {
|
||||
let trie_data = block.trie_data();
|
||||
let ExecutedBlock { recovered_block, execution_output, .. } = block;
|
||||
let block_number = recovered_block.number();
|
||||
let mut timings = metrics::SaveBlocksTimings { block_count, ..Default::default() };
|
||||
|
||||
// avoid capturing &self.tx in scope below.
|
||||
let sf_provider = &self.static_file_provider;
|
||||
let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
|
||||
|
||||
thread::scope(|s| {
|
||||
// SF writes
|
||||
let sf_handle = s.spawn(|| {
|
||||
let start = Instant::now();
|
||||
sf_provider.write_blocks_data(&blocks, &tx_nums, sf_ctx)?;
|
||||
Ok::<_, ProviderError>(start.elapsed())
|
||||
});
|
||||
|
||||
// MDBX writes
|
||||
let mdbx_start = Instant::now();
|
||||
|
||||
// Collect all transaction hashes across all blocks, sort them, and write in batch
|
||||
if !self.cached_storage_settings().transaction_hash_numbers_in_rocksdb &&
|
||||
self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full())
|
||||
{
|
||||
let start = Instant::now();
|
||||
let mut all_tx_hashes = Vec::new();
|
||||
for (i, block) in blocks.iter().enumerate() {
|
||||
let recovered_block = block.recovered_block();
|
||||
let mut tx_num = tx_nums[i];
|
||||
for transaction in recovered_block.body().transactions_iter() {
|
||||
all_tx_hashes.push((*transaction.tx_hash(), tx_num));
|
||||
tx_num += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by hash for optimal MDBX insertion performance
|
||||
all_tx_hashes.sort_unstable_by_key(|(hash, _)| *hash);
|
||||
|
||||
// Write all transaction hash numbers in a single batch
|
||||
self.with_rocksdb_batch(|batch| {
|
||||
let mut tx_hash_writer =
|
||||
EitherWriter::new_transaction_hash_numbers(self, batch)?;
|
||||
tx_hash_writer.put_transaction_hash_numbers_batch(all_tx_hashes, false)?;
|
||||
let raw_batch = tx_hash_writer.into_raw_rocksdb_batch();
|
||||
Ok(((), raw_batch))
|
||||
})?;
|
||||
self.metrics.record_duration(
|
||||
metrics::Action::InsertTransactionHashNumbers,
|
||||
start.elapsed(),
|
||||
);
|
||||
}
|
||||
|
||||
for (i, block) in blocks.iter().enumerate() {
|
||||
let recovered_block = block.recovered_block();
|
||||
|
||||
let start = Instant::now();
|
||||
self.insert_block_mdbx_only(recovered_block, tx_nums[i])?;
|
||||
timings.insert_block += start.elapsed();
|
||||
|
||||
if save_mode.with_state() {
|
||||
let execution_output = block.execution_outcome();
|
||||
let block_number = recovered_block.number();
|
||||
|
||||
// Write state and changesets to the database.
|
||||
// Must be written after blocks because of the receipt lookup.
|
||||
// Skip receipts/account changesets if they're being written to static files.
|
||||
let start = Instant::now();
|
||||
self.write_state(
|
||||
execution_output,
|
||||
OriginalValuesKnown::No,
|
||||
StateWriteConfig {
|
||||
write_receipts: !sf_ctx.write_receipts,
|
||||
write_account_changesets: !sf_ctx.write_account_changesets,
|
||||
},
|
||||
)?;
|
||||
timings.write_state += start.elapsed();
|
||||
|
||||
let trie_data = block.trie_data();
|
||||
|
||||
// insert hashes and intermediate merkle nodes
|
||||
let start = Instant::now();
|
||||
self.write_hashed_state(&trie_data.hashed_state)?;
|
||||
timings.write_hashed_state += start.elapsed();
|
||||
|
||||
let start = Instant::now();
|
||||
self.write_trie_changesets(block_number, &trie_data.trie_updates, None)?;
|
||||
timings.write_trie_changesets += start.elapsed();
|
||||
|
||||
let start = Instant::now();
|
||||
self.write_trie_updates_sorted(&trie_data.trie_updates)?;
|
||||
timings.write_trie_updates += start.elapsed();
|
||||
}
|
||||
}
|
||||
|
||||
// Full mode: update history indices
|
||||
if save_mode.with_state() {
|
||||
let start = Instant::now();
|
||||
self.update_history_indices(first_number..=last_block_number)?;
|
||||
timings.update_history_indices = start.elapsed();
|
||||
}
|
||||
|
||||
// Update pipeline progress
|
||||
let start = Instant::now();
|
||||
self.insert_block(&recovered_block)?;
|
||||
total_insert_block += start.elapsed();
|
||||
self.update_pipeline_stages(last_block_number, false)?;
|
||||
timings.update_pipeline_stages = start.elapsed();
|
||||
|
||||
// Write state and changesets to the database.
|
||||
// Must be written after blocks because of the receipt lookup.
|
||||
let start = Instant::now();
|
||||
self.write_state(&execution_output, OriginalValuesKnown::No)?;
|
||||
total_write_state += start.elapsed();
|
||||
timings.mdbx = mdbx_start.elapsed();
|
||||
|
||||
// insert hashes and intermediate merkle nodes
|
||||
let start = Instant::now();
|
||||
self.write_hashed_state(&trie_data.hashed_state)?;
|
||||
total_write_hashed_state += start.elapsed();
|
||||
// Wait for SF thread
|
||||
timings.sf = sf_handle
|
||||
.join()
|
||||
.map_err(|_| StaticFileWriterError::ThreadPanic("static file"))??;
|
||||
|
||||
let start = Instant::now();
|
||||
self.write_trie_changesets(block_number, &trie_data.trie_updates, None)?;
|
||||
total_write_trie_changesets += start.elapsed();
|
||||
timings.total = total_start.elapsed();
|
||||
|
||||
self.metrics.record_save_blocks(&timings);
|
||||
debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
/// Writes MDBX-only data for a block (indices, lookups, and senders if configured for MDBX).
|
||||
///
|
||||
/// SF data (headers, transactions, senders if SF, receipts if SF) must be written separately.
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
fn insert_block_mdbx_only(
|
||||
&self,
|
||||
block: &RecoveredBlock<BlockTy<N>>,
|
||||
first_tx_num: TxNumber,
|
||||
) -> ProviderResult<StoredBlockBodyIndices> {
|
||||
if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) &&
|
||||
EitherWriterDestination::senders(self).is_database()
|
||||
{
|
||||
let start = Instant::now();
|
||||
self.write_trie_updates_sorted(&trie_data.trie_updates)?;
|
||||
total_write_trie_updates += start.elapsed();
|
||||
let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
|
||||
let mut cursor = self.tx.cursor_write::<tables::TransactionSenders>()?;
|
||||
for (tx_num, sender) in tx_nums_iter.zip(block.senders_iter().copied()) {
|
||||
cursor.append(tx_num, &sender)?;
|
||||
}
|
||||
self.metrics
|
||||
.record_duration(metrics::Action::InsertTransactionSenders, start.elapsed());
|
||||
}
|
||||
|
||||
// update history indices
|
||||
let block_number = block.number();
|
||||
let tx_count = block.body().transaction_count() as u64;
|
||||
|
||||
let start = Instant::now();
|
||||
self.update_history_indices(first_number..=last_block_number)?;
|
||||
let duration_update_history_indices = start.elapsed();
|
||||
self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
|
||||
self.metrics.record_duration(metrics::Action::InsertHeaderNumbers, start.elapsed());
|
||||
|
||||
// Update pipeline progress
|
||||
self.write_block_body_indices(block_number, block.body(), first_tx_num, tx_count)?;
|
||||
|
||||
Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
|
||||
}
|
||||
|
||||
/// Writes MDBX block body indices (`BlockBodyIndices`, `TransactionBlocks`,
|
||||
/// `Ommers`/`Withdrawals`).
|
||||
fn write_block_body_indices(
|
||||
&self,
|
||||
block_number: BlockNumber,
|
||||
body: &BodyTy<N>,
|
||||
first_tx_num: TxNumber,
|
||||
tx_count: u64,
|
||||
) -> ProviderResult<()> {
|
||||
// MDBX: BlockBodyIndices
|
||||
let start = Instant::now();
|
||||
self.update_pipeline_stages(last_block_number, false)?;
|
||||
let duration_update_pipeline_stages = start.elapsed();
|
||||
self.tx
|
||||
.cursor_write::<tables::BlockBodyIndices>()?
|
||||
.append(block_number, &StoredBlockBodyIndices { first_tx_num, tx_count })?;
|
||||
self.metrics.record_duration(metrics::Action::InsertBlockBodyIndices, start.elapsed());
|
||||
|
||||
// Record all metrics at the end
|
||||
self.metrics.record_duration(metrics::Action::SaveBlocksInsertBlock, total_insert_block);
|
||||
self.metrics.record_duration(metrics::Action::SaveBlocksWriteState, total_write_state);
|
||||
self.metrics
|
||||
.record_duration(metrics::Action::SaveBlocksWriteHashedState, total_write_hashed_state);
|
||||
self.metrics.record_duration(
|
||||
metrics::Action::SaveBlocksWriteTrieChangesets,
|
||||
total_write_trie_changesets,
|
||||
);
|
||||
self.metrics
|
||||
.record_duration(metrics::Action::SaveBlocksWriteTrieUpdates, total_write_trie_updates);
|
||||
self.metrics.record_duration(
|
||||
metrics::Action::SaveBlocksUpdateHistoryIndices,
|
||||
duration_update_history_indices,
|
||||
);
|
||||
self.metrics.record_duration(
|
||||
metrics::Action::SaveBlocksUpdatePipelineStages,
|
||||
duration_update_pipeline_stages,
|
||||
);
|
||||
// MDBX: TransactionBlocks (last tx -> block mapping)
|
||||
if tx_count > 0 {
|
||||
let start = Instant::now();
|
||||
self.tx
|
||||
.cursor_write::<tables::TransactionBlocks>()?
|
||||
.append(first_tx_num + tx_count - 1, &block_number)?;
|
||||
self.metrics.record_duration(metrics::Action::InsertTransactionBlocks, start.elapsed());
|
||||
}
|
||||
|
||||
debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
|
||||
// MDBX: Ommers/Withdrawals
|
||||
self.storage.writer().write_block_bodies(self, vec![(block_number, Some(body))])?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1727,6 +1906,7 @@ impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N
|
||||
Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
fn update_pipeline_stages(
|
||||
&self,
|
||||
block_number: BlockNumber,
|
||||
@@ -1817,24 +1997,31 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
|
||||
{
|
||||
type Receipt = ReceiptTy<N>;
|
||||
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
fn write_state(
|
||||
&self,
|
||||
execution_outcome: &ExecutionOutcome<Self::Receipt>,
|
||||
is_value_known: OriginalValuesKnown,
|
||||
config: StateWriteConfig,
|
||||
) -> ProviderResult<()> {
|
||||
let first_block = execution_outcome.first_block();
|
||||
|
||||
let (plain_state, reverts) =
|
||||
execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
|
||||
|
||||
self.write_state_reverts(reverts, first_block, config)?;
|
||||
self.write_state_changes(plain_state)?;
|
||||
|
||||
if !config.write_receipts {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let block_count = execution_outcome.len() as u64;
|
||||
let last_block = execution_outcome.last_block();
|
||||
let block_range = first_block..=last_block;
|
||||
|
||||
let tip = self.last_block_number()?.max(last_block);
|
||||
|
||||
let (plain_state, reverts) =
|
||||
execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
|
||||
|
||||
self.write_state_reverts(reverts, first_block)?;
|
||||
self.write_state_changes(plain_state)?;
|
||||
|
||||
// Fetch the first transaction number for each block in the range
|
||||
let block_indices: Vec<_> = self
|
||||
.block_body_indices_range(block_range)?
|
||||
@@ -1918,6 +2105,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
|
||||
&self,
|
||||
reverts: PlainStateReverts,
|
||||
first_block: BlockNumber,
|
||||
config: StateWriteConfig,
|
||||
) -> ProviderResult<()> {
|
||||
// Write storage changes
|
||||
tracing::trace!("Writing storage changes");
|
||||
@@ -1965,7 +2153,11 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
|
||||
}
|
||||
}
|
||||
|
||||
// Write account changes to static files
|
||||
if !config.write_account_changesets {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Write account changes
|
||||
tracing::debug!(target: "sync::stages::merkle_changesets", ?first_block, "Writing account changes");
|
||||
for (block_index, account_block_reverts) in reverts.accounts.into_iter().enumerate() {
|
||||
let block_number = first_block + block_index as BlockNumber;
|
||||
@@ -2043,6 +2235,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
|
||||
// Write hashed account updates.
|
||||
let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
|
||||
@@ -2336,6 +2529,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider
|
||||
/// Writes trie updates to the database with already sorted updates.
|
||||
///
|
||||
/// Returns the number of entries modified.
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
|
||||
if trie_updates.is_empty() {
|
||||
return Ok(0)
|
||||
@@ -2379,6 +2573,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider
|
||||
/// the same `TrieUpdates`.
|
||||
///
|
||||
/// Returns the number of keys written.
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
fn write_trie_changesets(
|
||||
&self,
|
||||
block_number: BlockNumber,
|
||||
@@ -2970,6 +3165,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvi
|
||||
)
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
|
||||
// account history stage
|
||||
{
|
||||
@@ -2987,7 +3183,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvi
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecutionWriter
|
||||
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockExecutionWriter
|
||||
for DatabaseProvider<TX, N>
|
||||
{
|
||||
fn take_block_and_execution_above(
|
||||
@@ -3030,89 +3226,40 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecu
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWriter
|
||||
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
|
||||
for DatabaseProvider<TX, N>
|
||||
{
|
||||
type Block = BlockTy<N>;
|
||||
type Receipt = ReceiptTy<N>;
|
||||
|
||||
/// Inserts the block into the database, always modifying the following static file segments and
|
||||
/// tables:
|
||||
/// * [`StaticFileSegment::Headers`]
|
||||
/// * [`tables::HeaderNumbers`]
|
||||
/// * [`tables::BlockBodyIndices`]
|
||||
/// Inserts the block into the database, writing to both static files and MDBX.
|
||||
///
|
||||
/// If there are transactions in the block, the following static file segments and tables will
|
||||
/// be modified:
|
||||
/// * [`StaticFileSegment::Transactions`]
|
||||
/// * [`tables::TransactionBlocks`]
|
||||
///
|
||||
/// If ommers are not empty, this will modify [`BlockOmmers`](tables::BlockOmmers).
|
||||
/// If withdrawals are not empty, this will modify
|
||||
/// [`BlockWithdrawals`](tables::BlockWithdrawals).
|
||||
///
|
||||
/// If the provider has __not__ configured full sender pruning, this will modify either:
|
||||
/// * [`StaticFileSegment::TransactionSenders`] if senders are written to static files
|
||||
/// * [`tables::TransactionSenders`] if senders are written to the database
|
||||
///
|
||||
/// If the provider has __not__ configured full transaction lookup pruning, this will modify
|
||||
/// [`TransactionHashNumbers`](tables::TransactionHashNumbers).
|
||||
/// This is a convenience method primarily used in tests. For production use,
|
||||
/// prefer [`Self::save_blocks`] which handles execution output and trie data.
|
||||
fn insert_block(
|
||||
&self,
|
||||
block: &RecoveredBlock<Self::Block>,
|
||||
) -> ProviderResult<StoredBlockBodyIndices> {
|
||||
let block_number = block.number();
|
||||
let tx_count = block.body().transaction_count() as u64;
|
||||
|
||||
let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
|
||||
|
||||
self.static_file_provider
|
||||
.get_writer(block_number, StaticFileSegment::Headers)?
|
||||
.append_header(block.header(), &block.hash())?;
|
||||
|
||||
self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
|
||||
durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
|
||||
|
||||
let first_tx_num = self
|
||||
.tx
|
||||
.cursor_read::<tables::TransactionBlocks>()?
|
||||
.last()?
|
||||
.map(|(n, _)| n + 1)
|
||||
.unwrap_or_default();
|
||||
durations_recorder.record_relative(metrics::Action::GetNextTxNum);
|
||||
|
||||
let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
|
||||
|
||||
if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
|
||||
let mut senders_writer = EitherWriter::new_senders(self, block.number())?;
|
||||
senders_writer.increment_block(block.number())?;
|
||||
senders_writer
|
||||
.append_senders(tx_nums_iter.clone().zip(block.senders_iter().copied()))?;
|
||||
durations_recorder.record_relative(metrics::Action::InsertTransactionSenders);
|
||||
}
|
||||
|
||||
if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
|
||||
self.with_rocksdb_batch(|batch| {
|
||||
let mut writer = EitherWriter::new_transaction_hash_numbers(self, batch)?;
|
||||
for (tx_num, transaction) in tx_nums_iter.zip(block.body().transactions_iter()) {
|
||||
let hash = transaction.tx_hash();
|
||||
writer.put_transaction_hash_number(*hash, tx_num, false)?;
|
||||
}
|
||||
Ok(((), writer.into_raw_rocksdb_batch()))
|
||||
})?;
|
||||
durations_recorder.record_relative(metrics::Action::InsertTransactionHashNumbers);
|
||||
}
|
||||
|
||||
self.append_block_bodies(vec![(block_number, Some(block.body()))])?;
|
||||
|
||||
debug!(
|
||||
target: "providers::db",
|
||||
?block_number,
|
||||
actions = ?durations_recorder.actions,
|
||||
"Inserted block"
|
||||
// Wrap block in ExecutedBlock with empty execution output (no receipts/state/trie)
|
||||
let executed_block = ExecutedBlock::new(
|
||||
Arc::new(block.clone()),
|
||||
Arc::new(ExecutionOutcome::new(
|
||||
Default::default(),
|
||||
Vec::<Vec<ReceiptTy<N>>>::new(),
|
||||
block_number,
|
||||
vec![],
|
||||
)),
|
||||
ComputedTrieData::default(),
|
||||
);
|
||||
|
||||
Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
|
||||
// Delegate to save_blocks with BlocksOnly mode (skips receipts/state/trie)
|
||||
self.save_blocks(vec![executed_block], SaveBlocksMode::BlocksOnly)?;
|
||||
|
||||
// Return the body indices
|
||||
self.block_body_indices(block_number)?
|
||||
.ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))
|
||||
}
|
||||
|
||||
fn append_block_bodies(
|
||||
@@ -3298,7 +3445,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWrite
|
||||
durations_recorder.record_relative(metrics::Action::InsertBlock);
|
||||
}
|
||||
|
||||
self.write_state(execution_outcome, OriginalValuesKnown::No)?;
|
||||
self.write_state(execution_outcome, OriginalValuesKnown::No, StateWriteConfig::default())?;
|
||||
durations_recorder.record_relative(metrics::Action::InsertState);
|
||||
|
||||
// insert hashes and intermediate merkle nodes
|
||||
@@ -3440,17 +3587,28 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
|
||||
|
||||
self.static_file_provider.commit()?;
|
||||
} else {
|
||||
self.static_file_provider.commit()?;
|
||||
// Normal path: finalize() will call sync_all() if not already synced
|
||||
let mut timings = metrics::CommitTimings::default();
|
||||
|
||||
let start = Instant::now();
|
||||
self.static_file_provider.finalize()?;
|
||||
timings.sf = start.elapsed();
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
{
|
||||
let start = Instant::now();
|
||||
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
|
||||
for batch in batches {
|
||||
self.rocksdb_provider.commit_batch(batch)?;
|
||||
}
|
||||
timings.rocksdb = start.elapsed();
|
||||
}
|
||||
|
||||
let start = Instant::now();
|
||||
self.tx.commit()?;
|
||||
timings.mdbx = start.elapsed();
|
||||
|
||||
self.metrics.record_commit(&timings);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -3523,10 +3681,17 @@ mod tests {
|
||||
.write_state(
|
||||
&ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
|
||||
crate::OriginalValuesKnown::No,
|
||||
StateWriteConfig::default(),
|
||||
)
|
||||
.unwrap();
|
||||
provider_rw.insert_block(&data.blocks[0].0).unwrap();
|
||||
provider_rw.write_state(&data.blocks[0].1, crate::OriginalValuesKnown::No).unwrap();
|
||||
provider_rw
|
||||
.write_state(
|
||||
&data.blocks[0].1,
|
||||
crate::OriginalValuesKnown::No,
|
||||
StateWriteConfig::default(),
|
||||
)
|
||||
.unwrap();
|
||||
provider_rw.commit().unwrap();
|
||||
|
||||
let provider = factory.provider().unwrap();
|
||||
@@ -3549,11 +3714,18 @@ mod tests {
|
||||
.write_state(
|
||||
&ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
|
||||
crate::OriginalValuesKnown::No,
|
||||
StateWriteConfig::default(),
|
||||
)
|
||||
.unwrap();
|
||||
for i in 0..3 {
|
||||
provider_rw.insert_block(&data.blocks[i].0).unwrap();
|
||||
provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
|
||||
provider_rw
|
||||
.write_state(
|
||||
&data.blocks[i].1,
|
||||
crate::OriginalValuesKnown::No,
|
||||
StateWriteConfig::default(),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
provider_rw.commit().unwrap();
|
||||
|
||||
@@ -3579,13 +3751,20 @@ mod tests {
|
||||
.write_state(
|
||||
&ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
|
||||
crate::OriginalValuesKnown::No,
|
||||
StateWriteConfig::default(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// insert blocks 1-3 with receipts
|
||||
for i in 0..3 {
|
||||
provider_rw.insert_block(&data.blocks[i].0).unwrap();
|
||||
provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
|
||||
provider_rw
|
||||
.write_state(
|
||||
&data.blocks[i].1,
|
||||
crate::OriginalValuesKnown::No,
|
||||
StateWriteConfig::default(),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
provider_rw.commit().unwrap();
|
||||
|
||||
@@ -3610,11 +3789,18 @@ mod tests {
|
||||
.write_state(
|
||||
&ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
|
||||
crate::OriginalValuesKnown::No,
|
||||
StateWriteConfig::default(),
|
||||
)
|
||||
.unwrap();
|
||||
for i in 0..3 {
|
||||
provider_rw.insert_block(&data.blocks[i].0).unwrap();
|
||||
provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
|
||||
provider_rw
|
||||
.write_state(
|
||||
&data.blocks[i].1,
|
||||
crate::OriginalValuesKnown::No,
|
||||
StateWriteConfig::default(),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
provider_rw.commit().unwrap();
|
||||
|
||||
@@ -3673,11 +3859,18 @@ mod tests {
|
||||
.write_state(
|
||||
&ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
|
||||
crate::OriginalValuesKnown::No,
|
||||
StateWriteConfig::default(),
|
||||
)
|
||||
.unwrap();
|
||||
for i in 0..3 {
|
||||
provider_rw.insert_block(&data.blocks[i].0).unwrap();
|
||||
provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
|
||||
provider_rw
|
||||
.write_state(
|
||||
&data.blocks[i].1,
|
||||
crate::OriginalValuesKnown::No,
|
||||
StateWriteConfig::default(),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
provider_rw.commit().unwrap();
|
||||
|
||||
@@ -4991,7 +5184,9 @@ mod tests {
|
||||
}]],
|
||||
..Default::default()
|
||||
};
|
||||
provider_rw.write_state(&outcome, crate::OriginalValuesKnown::No).unwrap();
|
||||
provider_rw
|
||||
.write_state(&outcome, crate::OriginalValuesKnown::No, StateWriteConfig::default())
|
||||
.unwrap();
|
||||
provider_rw.commit().unwrap();
|
||||
};
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ pub use database::*;
|
||||
mod static_file;
|
||||
pub use static_file::{
|
||||
StaticFileAccess, StaticFileJarProvider, StaticFileProvider, StaticFileProviderBuilder,
|
||||
StaticFileProviderRW, StaticFileProviderRWRefMut, StaticFileWriter,
|
||||
StaticFileProviderRW, StaticFileProviderRWRefMut, StaticFileWriteCtx, StaticFileWriter,
|
||||
};
|
||||
|
||||
mod state;
|
||||
|
||||
@@ -14,6 +14,7 @@ use alloy_primitives::{b256, keccak256, Address, BlockHash, BlockNumber, TxHash,
|
||||
use dashmap::DashMap;
|
||||
use notify::{RecommendedWatcher, RecursiveMode, Watcher};
|
||||
use parking_lot::RwLock;
|
||||
use reth_chain_state::ExecutedBlock;
|
||||
use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, NamedChain};
|
||||
use reth_db::{
|
||||
lockfile::StorageLock,
|
||||
@@ -24,7 +25,7 @@ use reth_db::{
|
||||
};
|
||||
use reth_db_api::{
|
||||
cursor::DbCursorRO,
|
||||
models::StoredBlockBodyIndices,
|
||||
models::{AccountBeforeTx, StoredBlockBodyIndices},
|
||||
table::{Decompress, Table, Value},
|
||||
tables,
|
||||
transaction::DbTx,
|
||||
@@ -32,7 +33,9 @@ use reth_db_api::{
|
||||
use reth_ethereum_primitives::{Receipt, TransactionSigned};
|
||||
use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION};
|
||||
use reth_node_types::NodePrimitives;
|
||||
use reth_primitives_traits::{RecoveredBlock, SealedHeader, SignedTransaction};
|
||||
use reth_primitives_traits::{
|
||||
AlloyBlockHeader as _, BlockBody as _, RecoveredBlock, SealedHeader, SignedTransaction,
|
||||
};
|
||||
use reth_stages_types::{PipelineTarget, StageId};
|
||||
use reth_static_file_types::{
|
||||
find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileMap,
|
||||
@@ -41,15 +44,16 @@ use reth_static_file_types::{
|
||||
use reth_storage_api::{
|
||||
BlockBodyIndicesProvider, ChangeSetReader, DBProvider, StorageSettingsCache,
|
||||
};
|
||||
use reth_storage_errors::provider::{ProviderError, ProviderResult};
|
||||
use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
fmt::Debug,
|
||||
ops::{Deref, Range, RangeBounds, RangeInclusive},
|
||||
path::{Path, PathBuf},
|
||||
sync::{atomic::AtomicU64, mpsc, Arc},
|
||||
thread,
|
||||
};
|
||||
use tracing::{debug, info, trace, warn};
|
||||
use tracing::{debug, info, instrument, trace, warn};
|
||||
|
||||
/// Alias type for a map that can be queried for block or transaction ranges. It uses `u64` to
|
||||
/// represent either a block or a transaction number end of a static file range.
|
||||
@@ -77,6 +81,25 @@ impl StaticFileAccess {
|
||||
}
|
||||
}
|
||||
|
||||
/// Context for static file block writes.
|
||||
///
|
||||
/// Contains target segments and pruning configuration.
|
||||
#[derive(Debug, Clone, Copy, Default)]
|
||||
pub struct StaticFileWriteCtx {
|
||||
/// Whether transaction senders should be written to static files.
|
||||
pub write_senders: bool,
|
||||
/// Whether receipts should be written to static files.
|
||||
pub write_receipts: bool,
|
||||
/// Whether account changesets should be written to static files.
|
||||
pub write_account_changesets: bool,
|
||||
/// The current chain tip block number (for pruning).
|
||||
pub tip: BlockNumber,
|
||||
/// The prune mode for receipts, if any.
|
||||
pub receipts_prune_mode: Option<reth_prune_types::PruneMode>,
|
||||
/// Whether receipts are prunable (based on storage settings and prune distance).
|
||||
pub receipts_prunable: bool,
|
||||
}
|
||||
|
||||
/// [`StaticFileProvider`] manages all existing [`StaticFileJarProvider`].
|
||||
///
|
||||
/// "Static files" contain immutable chain history data, such as:
|
||||
@@ -504,6 +527,192 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes headers for all blocks to the static file segment.
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
fn write_headers(
|
||||
w: &mut StaticFileProviderRWRefMut<'_, N>,
|
||||
blocks: &[ExecutedBlock<N>],
|
||||
) -> ProviderResult<()> {
|
||||
for block in blocks {
|
||||
let b = block.recovered_block();
|
||||
w.append_header(b.header(), &b.hash())?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes transactions for all blocks to the static file segment.
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
fn write_transactions(
|
||||
w: &mut StaticFileProviderRWRefMut<'_, N>,
|
||||
blocks: &[ExecutedBlock<N>],
|
||||
tx_nums: &[TxNumber],
|
||||
) -> ProviderResult<()> {
|
||||
for (block, &first_tx) in blocks.iter().zip(tx_nums) {
|
||||
let b = block.recovered_block();
|
||||
w.increment_block(b.number())?;
|
||||
for (i, tx) in b.body().transactions().iter().enumerate() {
|
||||
w.append_transaction(first_tx + i as u64, tx)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes transaction senders for all blocks to the static file segment.
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
fn write_transaction_senders(
|
||||
w: &mut StaticFileProviderRWRefMut<'_, N>,
|
||||
blocks: &[ExecutedBlock<N>],
|
||||
tx_nums: &[TxNumber],
|
||||
) -> ProviderResult<()> {
|
||||
for (block, &first_tx) in blocks.iter().zip(tx_nums) {
|
||||
let b = block.recovered_block();
|
||||
w.increment_block(b.number())?;
|
||||
for (i, sender) in b.senders_iter().enumerate() {
|
||||
w.append_transaction_sender(first_tx + i as u64, sender)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes receipts for all blocks to the static file segment.
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
fn write_receipts(
|
||||
w: &mut StaticFileProviderRWRefMut<'_, N>,
|
||||
blocks: &[ExecutedBlock<N>],
|
||||
tx_nums: &[TxNumber],
|
||||
ctx: &StaticFileWriteCtx,
|
||||
) -> ProviderResult<()> {
|
||||
for (block, &first_tx) in blocks.iter().zip(tx_nums) {
|
||||
let block_number = block.recovered_block().number();
|
||||
w.increment_block(block_number)?;
|
||||
|
||||
// skip writing receipts if pruning configuration requires us to.
|
||||
if ctx.receipts_prunable &&
|
||||
ctx.receipts_prune_mode
|
||||
.is_some_and(|mode| mode.should_prune(block_number, ctx.tip))
|
||||
{
|
||||
continue
|
||||
}
|
||||
|
||||
for (i, receipt) in block.execution_outcome().receipts.iter().flatten().enumerate() {
|
||||
w.append_receipt(first_tx + i as u64, receipt)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes account changesets for all blocks to the static file segment.
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
fn write_account_changesets(
|
||||
w: &mut StaticFileProviderRWRefMut<'_, N>,
|
||||
blocks: &[ExecutedBlock<N>],
|
||||
) -> ProviderResult<()> {
|
||||
for block in blocks {
|
||||
let block_number = block.recovered_block().number();
|
||||
let reverts = block.execution_outcome().bundle.reverts.to_plain_state_reverts();
|
||||
|
||||
for account_block_reverts in reverts.accounts {
|
||||
let changeset = account_block_reverts
|
||||
.into_iter()
|
||||
.map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
|
||||
.collect::<Vec<_>>();
|
||||
w.append_account_changeset(changeset, block_number)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Spawns a scoped thread that writes to a static file segment using the provided closure.
|
||||
///
|
||||
/// The closure receives a mutable reference to the segment writer. After the closure completes,
|
||||
/// `sync_all()` is called to flush writes to disk.
|
||||
fn spawn_segment_writer<'scope, 'env, F>(
|
||||
&'env self,
|
||||
scope: &'scope thread::Scope<'scope, 'env>,
|
||||
segment: StaticFileSegment,
|
||||
first_block_number: BlockNumber,
|
||||
f: F,
|
||||
) -> thread::ScopedJoinHandle<'scope, ProviderResult<()>>
|
||||
where
|
||||
F: FnOnce(&mut StaticFileProviderRWRefMut<'_, N>) -> ProviderResult<()> + Send + 'env,
|
||||
{
|
||||
scope.spawn(move || {
|
||||
let mut w = self.get_writer(first_block_number, segment)?;
|
||||
f(&mut w)?;
|
||||
w.sync_all()
|
||||
})
|
||||
}
|
||||
|
||||
/// Writes all static file data for multiple blocks in parallel per-segment.
|
||||
///
|
||||
/// This spawns separate threads for each segment type and each thread calls `sync_all()` on its
|
||||
/// writer when done.
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
pub fn write_blocks_data(
|
||||
&self,
|
||||
blocks: &[ExecutedBlock<N>],
|
||||
tx_nums: &[TxNumber],
|
||||
ctx: StaticFileWriteCtx,
|
||||
) -> ProviderResult<()> {
|
||||
if blocks.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let first_block_number = blocks[0].recovered_block().number();
|
||||
|
||||
thread::scope(|s| {
|
||||
let h_headers =
|
||||
self.spawn_segment_writer(s, StaticFileSegment::Headers, first_block_number, |w| {
|
||||
Self::write_headers(w, blocks)
|
||||
});
|
||||
|
||||
let h_txs = self.spawn_segment_writer(
|
||||
s,
|
||||
StaticFileSegment::Transactions,
|
||||
first_block_number,
|
||||
|w| Self::write_transactions(w, blocks, tx_nums),
|
||||
);
|
||||
|
||||
let h_senders = ctx.write_senders.then(|| {
|
||||
self.spawn_segment_writer(
|
||||
s,
|
||||
StaticFileSegment::TransactionSenders,
|
||||
first_block_number,
|
||||
|w| Self::write_transaction_senders(w, blocks, tx_nums),
|
||||
)
|
||||
});
|
||||
|
||||
let h_receipts = ctx.write_receipts.then(|| {
|
||||
self.spawn_segment_writer(s, StaticFileSegment::Receipts, first_block_number, |w| {
|
||||
Self::write_receipts(w, blocks, tx_nums, &ctx)
|
||||
})
|
||||
});
|
||||
|
||||
let h_account_changesets = ctx.write_account_changesets.then(|| {
|
||||
self.spawn_segment_writer(
|
||||
s,
|
||||
StaticFileSegment::AccountChangeSets,
|
||||
first_block_number,
|
||||
|w| Self::write_account_changesets(w, blocks),
|
||||
)
|
||||
});
|
||||
|
||||
h_headers.join().map_err(|_| StaticFileWriterError::ThreadPanic("headers"))??;
|
||||
h_txs.join().map_err(|_| StaticFileWriterError::ThreadPanic("transactions"))??;
|
||||
if let Some(h) = h_senders {
|
||||
h.join().map_err(|_| StaticFileWriterError::ThreadPanic("senders"))??;
|
||||
}
|
||||
if let Some(h) = h_receipts {
|
||||
h.join().map_err(|_| StaticFileWriterError::ThreadPanic("receipts"))??;
|
||||
}
|
||||
if let Some(h) = h_account_changesets {
|
||||
h.join()
|
||||
.map_err(|_| StaticFileWriterError::ThreadPanic("account_changesets"))??;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
/// Gets the [`StaticFileJarProvider`] of the requested segment and start index that can be
|
||||
/// either block or transaction.
|
||||
pub fn get_segment_provider(
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
mod manager;
|
||||
pub use manager::{
|
||||
StaticFileAccess, StaticFileProvider, StaticFileProviderBuilder, StaticFileWriter,
|
||||
StaticFileAccess, StaticFileProvider, StaticFileProviderBuilder, StaticFileWriteCtx,
|
||||
StaticFileWriter,
|
||||
};
|
||||
|
||||
mod jar;
|
||||
|
||||
@@ -206,6 +206,8 @@ pub struct StaticFileProviderRW<N> {
|
||||
metrics: Option<Arc<StaticFileProviderMetrics>>,
|
||||
/// On commit, contains the pruning strategy to apply for the segment.
|
||||
prune_on_commit: Option<PruneStrategy>,
|
||||
/// Whether `sync_all()` has been called. Used by `finalize()` to avoid redundant syncs.
|
||||
synced: bool,
|
||||
}
|
||||
|
||||
impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
@@ -227,6 +229,7 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
reader,
|
||||
metrics,
|
||||
prune_on_commit: None,
|
||||
synced: false,
|
||||
};
|
||||
|
||||
writer.ensure_end_range_consistency()?;
|
||||
@@ -335,12 +338,13 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
if self.writer.is_dirty() {
|
||||
self.writer.sync_all().map_err(ProviderError::other)?;
|
||||
}
|
||||
self.synced = true;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Commits configuration to disk and updates the reader index.
|
||||
///
|
||||
/// Must be called after [`Self::sync_all`] to complete the commit.
|
||||
/// If `sync_all()` was not called, this will call it first to ensure data is persisted.
|
||||
///
|
||||
/// Returns an error if prune is queued (use [`Self::commit`] instead).
|
||||
pub fn finalize(&mut self) -> ProviderResult<()> {
|
||||
@@ -348,9 +352,14 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
return Err(StaticFileWriterError::FinalizeWithPruneQueued.into());
|
||||
}
|
||||
if self.writer.is_dirty() {
|
||||
if !self.synced {
|
||||
self.writer.sync_all().map_err(ProviderError::other)?;
|
||||
}
|
||||
|
||||
self.writer.finalize().map_err(ProviderError::other)?;
|
||||
self.update_index()?;
|
||||
}
|
||||
self.synced = false;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -13,7 +13,9 @@ mod tests {
|
||||
use reth_ethereum_primitives::Receipt;
|
||||
use reth_execution_types::ExecutionOutcome;
|
||||
use reth_primitives_traits::{Account, StorageEntry};
|
||||
use reth_storage_api::{DatabaseProviderFactory, HashedPostStateProvider, StateWriter};
|
||||
use reth_storage_api::{
|
||||
DatabaseProviderFactory, HashedPostStateProvider, StateWriteConfig, StateWriter,
|
||||
};
|
||||
use reth_trie::{
|
||||
test_utils::{state_root, storage_root_prehashed},
|
||||
HashedPostState, HashedStorage, StateRoot, StorageRoot, StorageRootProgress,
|
||||
@@ -135,7 +137,7 @@ mod tests {
|
||||
provider.write_state_changes(plain_state).expect("Could not write plain state to DB");
|
||||
|
||||
assert_eq!(reverts.storage, [[]]);
|
||||
provider.write_state_reverts(reverts, 1).expect("Could not write reverts to DB");
|
||||
provider.write_state_reverts(reverts, 1, StateWriteConfig::default()).expect("Could not write reverts to DB");
|
||||
|
||||
let reth_account_a = account_a.into();
|
||||
let reth_account_b = account_b.into();
|
||||
@@ -201,7 +203,7 @@ mod tests {
|
||||
reverts.storage,
|
||||
[[PlainStorageRevert { address: address_b, wiped: true, storage_revert: vec![] }]]
|
||||
);
|
||||
provider.write_state_reverts(reverts, 2).expect("Could not write reverts to DB");
|
||||
provider.write_state_reverts(reverts, 2, StateWriteConfig::default()).expect("Could not write reverts to DB");
|
||||
|
||||
// Check new plain state for account B
|
||||
assert_eq!(
|
||||
@@ -280,7 +282,7 @@ mod tests {
|
||||
|
||||
let outcome = ExecutionOutcome::new(state.take_bundle(), Default::default(), 1, Vec::new());
|
||||
provider
|
||||
.write_state(&outcome, OriginalValuesKnown::Yes)
|
||||
.write_state(&outcome, OriginalValuesKnown::Yes, StateWriteConfig::default())
|
||||
.expect("Could not write bundle state to DB");
|
||||
|
||||
// Check plain storage state
|
||||
@@ -380,7 +382,7 @@ mod tests {
|
||||
state.merge_transitions(BundleRetention::Reverts);
|
||||
let outcome = ExecutionOutcome::new(state.take_bundle(), Default::default(), 2, Vec::new());
|
||||
provider
|
||||
.write_state(&outcome, OriginalValuesKnown::Yes)
|
||||
.write_state(&outcome, OriginalValuesKnown::Yes, StateWriteConfig::default())
|
||||
.expect("Could not write bundle state to DB");
|
||||
|
||||
assert_eq!(
|
||||
@@ -448,7 +450,7 @@ mod tests {
|
||||
let outcome =
|
||||
ExecutionOutcome::new(init_state.take_bundle(), Default::default(), 0, Vec::new());
|
||||
provider
|
||||
.write_state(&outcome, OriginalValuesKnown::Yes)
|
||||
.write_state(&outcome, OriginalValuesKnown::Yes, StateWriteConfig::default())
|
||||
.expect("Could not write bundle state to DB");
|
||||
|
||||
let mut state = State::builder().with_bundle_update().build();
|
||||
@@ -607,7 +609,7 @@ mod tests {
|
||||
let outcome: ExecutionOutcome =
|
||||
ExecutionOutcome::new(bundle, Default::default(), 1, Vec::new());
|
||||
provider
|
||||
.write_state(&outcome, OriginalValuesKnown::Yes)
|
||||
.write_state(&outcome, OriginalValuesKnown::Yes, StateWriteConfig::default())
|
||||
.expect("Could not write bundle state to DB");
|
||||
|
||||
let mut storage_changeset_cursor = provider
|
||||
@@ -773,7 +775,7 @@ mod tests {
|
||||
let outcome =
|
||||
ExecutionOutcome::new(init_state.take_bundle(), Default::default(), 0, Vec::new());
|
||||
provider
|
||||
.write_state(&outcome, OriginalValuesKnown::Yes)
|
||||
.write_state(&outcome, OriginalValuesKnown::Yes, StateWriteConfig::default())
|
||||
.expect("Could not write bundle state to DB");
|
||||
|
||||
let mut state = State::builder().with_bundle_update().build();
|
||||
@@ -822,7 +824,7 @@ mod tests {
|
||||
state.merge_transitions(BundleRetention::Reverts);
|
||||
let outcome = ExecutionOutcome::new(state.take_bundle(), Default::default(), 1, Vec::new());
|
||||
provider
|
||||
.write_state(&outcome, OriginalValuesKnown::Yes)
|
||||
.write_state(&outcome, OriginalValuesKnown::Yes, StateWriteConfig::default())
|
||||
.expect("Could not write bundle state to DB");
|
||||
|
||||
let mut storage_changeset_cursor = provider
|
||||
|
||||
@@ -12,21 +12,26 @@ pub trait StateWriter {
|
||||
/// Receipt type included into [`ExecutionOutcome`].
|
||||
type Receipt;
|
||||
|
||||
/// Write the state and receipts to the database or static files if `static_file_producer` is
|
||||
/// `Some`. It should be `None` if there is any kind of pruning/filtering over the receipts.
|
||||
/// Write the state and optionally receipts to the database.
|
||||
///
|
||||
/// Use `config` to skip writing certain data types when they are written elsewhere.
|
||||
fn write_state(
|
||||
&self,
|
||||
execution_outcome: &ExecutionOutcome<Self::Receipt>,
|
||||
is_value_known: OriginalValuesKnown,
|
||||
config: StateWriteConfig,
|
||||
) -> ProviderResult<()>;
|
||||
|
||||
/// Write state reverts to the database.
|
||||
///
|
||||
/// NOTE: Reverts will delete all wiped storage from plain state.
|
||||
///
|
||||
/// Use `config` to skip writing certain data types when they are written elsewhere.
|
||||
fn write_state_reverts(
|
||||
&self,
|
||||
reverts: PlainStateReverts,
|
||||
first_block: BlockNumber,
|
||||
config: StateWriteConfig,
|
||||
) -> ProviderResult<()>;
|
||||
|
||||
/// Write state changes to the database.
|
||||
@@ -46,3 +51,20 @@ pub trait StateWriter {
|
||||
block: BlockNumber,
|
||||
) -> ProviderResult<ExecutionOutcome<Self::Receipt>>;
|
||||
}
|
||||
|
||||
/// Configuration for what to write when calling [`StateWriter::write_state`].
|
||||
///
|
||||
/// Used to skip writing certain data types, when they are being written separately.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct StateWriteConfig {
|
||||
/// Whether to write receipts.
|
||||
pub write_receipts: bool,
|
||||
/// Whether to write account changesets.
|
||||
pub write_account_changesets: bool,
|
||||
}
|
||||
|
||||
impl Default for StateWriteConfig {
|
||||
fn default() -> Self {
|
||||
Self { write_receipts: true, write_account_changesets: true }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ use reth_primitives_traits::{Block as BlockTrait, RecoveredBlock, SealedBlock};
|
||||
use reth_provider::{
|
||||
test_utils::create_test_provider_factory_with_chain_spec, BlockWriter, DatabaseProviderFactory,
|
||||
ExecutionOutcome, HeaderProvider, HistoryWriter, OriginalValuesKnown, StateProofProvider,
|
||||
StateWriter, StaticFileProviderFactory, StaticFileSegment, StaticFileWriter,
|
||||
StateWriteConfig, StateWriter, StaticFileProviderFactory, StaticFileSegment, StaticFileWriter,
|
||||
};
|
||||
use reth_revm::{database::StateProviderDatabase, witness::ExecutionWitnessRecord, State};
|
||||
use reth_stateless::{
|
||||
@@ -325,7 +325,11 @@ fn run_case(
|
||||
|
||||
// Commit the post state/state diff to the database
|
||||
provider
|
||||
.write_state(&ExecutionOutcome::single(block.number, output), OriginalValuesKnown::Yes)
|
||||
.write_state(
|
||||
&ExecutionOutcome::single(block.number, output),
|
||||
OriginalValuesKnown::Yes,
|
||||
StateWriteConfig::default(),
|
||||
)
|
||||
.map_err(|err| Error::block_failed(block_number, program_inputs.clone(), err))?;
|
||||
|
||||
provider
|
||||
|
||||
Reference in New Issue
Block a user