mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-28 00:28:20 -05:00
chore: use readonly providers in StorageWriter (#9822)
This commit is contained in:
@@ -143,8 +143,8 @@ impl<DB: Database> PersistenceService<DB> {
|
||||
let provider = self.provider.static_file_provider();
|
||||
|
||||
let header_writer = provider.get_writer(block.number, StaticFileSegment::Headers)?;
|
||||
let provider_rw = self.provider.provider_rw()?;
|
||||
let mut storage_writer = StorageWriter::new(Some(&provider_rw), Some(header_writer));
|
||||
let provider_ro = self.provider.provider()?;
|
||||
let mut storage_writer = StorageWriter::new(Some(&provider_ro), Some(header_writer));
|
||||
storage_writer.append_headers_from_blocks(
|
||||
block.header().number,
|
||||
std::iter::once(&(block.header(), block.hash())),
|
||||
@@ -152,7 +152,7 @@ impl<DB: Database> PersistenceService<DB> {
|
||||
|
||||
let transactions_writer =
|
||||
provider.get_writer(block.number, StaticFileSegment::Transactions)?;
|
||||
let mut storage_writer = StorageWriter::new(Some(&provider_rw), Some(transactions_writer));
|
||||
let mut storage_writer = StorageWriter::new(Some(&provider_ro), Some(transactions_writer));
|
||||
let no_hash_transactions =
|
||||
block.body.clone().into_iter().map(TransactionSignedNoHash::from).collect();
|
||||
storage_writer.append_transactions_from_blocks(
|
||||
@@ -212,7 +212,7 @@ impl<DB: Database> PersistenceService<DB> {
|
||||
fn remove_static_file_blocks_above(&self, block_number: u64) -> ProviderResult<()> {
|
||||
debug!(target: "tree::persistence", ?block_number, "Removing static file blocks above block_number");
|
||||
let sf_provider = self.provider.static_file_provider();
|
||||
let db_provider_rw = self.provider.provider_rw()?;
|
||||
let db_provider_ro = self.provider.provider()?;
|
||||
|
||||
// get highest static file block for the total block range
|
||||
let highest_static_file_block = sf_provider
|
||||
@@ -221,7 +221,7 @@ impl<DB: Database> PersistenceService<DB> {
|
||||
|
||||
// Get the total txs for the block range, so we have the correct number of columns for
|
||||
// receipts and transactions
|
||||
let tx_range = db_provider_rw
|
||||
let tx_range = db_provider_ro
|
||||
.transaction_range_by_block_range(block_number..=highest_static_file_block)?;
|
||||
let total_txs = tx_range.end().saturating_sub(*tx_range.start());
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use crate::{
|
||||
providers::StaticFileProviderRWRefMut, DatabaseProviderRW, StateChangeWriter, StateWriter,
|
||||
TrieWriter,
|
||||
providers::StaticFileProviderRWRefMut, DatabaseProvider, DatabaseProviderRO,
|
||||
DatabaseProviderRW, StateChangeWriter, StateWriter, TrieWriter,
|
||||
};
|
||||
use itertools::Itertools;
|
||||
use reth_db::{
|
||||
@@ -33,29 +33,24 @@ enum StorageType<C = (), S = ()> {
|
||||
/// [`StorageWriter`] is responsible for managing the writing to either database, static file or
|
||||
/// both.
|
||||
#[derive(Debug)]
|
||||
pub struct StorageWriter<'a, 'b, DB: Database> {
|
||||
database_writer: Option<&'a DatabaseProviderRW<DB>>,
|
||||
pub struct StorageWriter<'a, 'b, TX> {
|
||||
database_writer: Option<&'a DatabaseProvider<TX>>,
|
||||
static_file_writer: Option<StaticFileProviderRWRefMut<'b>>,
|
||||
}
|
||||
|
||||
impl<'a, 'b, DB: Database> StorageWriter<'a, 'b, DB> {
|
||||
impl<'a, 'b, TX> StorageWriter<'a, 'b, TX> {
|
||||
/// Creates a new instance of [`StorageWriter`].
|
||||
///
|
||||
/// # Parameters
|
||||
/// - `database_writer`: An optional reference to a database writer.
|
||||
/// - `static_file_writer`: An optional mutable reference to a static file writer.
|
||||
pub const fn new(
|
||||
database_writer: Option<&'a DatabaseProviderRW<DB>>,
|
||||
database_writer: Option<&'a DatabaseProvider<TX>>,
|
||||
static_file_writer: Option<StaticFileProviderRWRefMut<'b>>,
|
||||
) -> Self {
|
||||
Self { database_writer, static_file_writer }
|
||||
}
|
||||
|
||||
/// Creates a new instance of [`StorageWriter`] from a database writer.
|
||||
pub const fn from_database_writer(database_writer: &'a DatabaseProviderRW<DB>) -> Self {
|
||||
Self::new(Some(database_writer), None)
|
||||
}
|
||||
|
||||
/// Creates a new instance of [`StorageWriter`] from a static file writer.
|
||||
pub const fn from_static_file_writer(
|
||||
static_file_writer: StaticFileProviderRWRefMut<'b>,
|
||||
@@ -63,11 +58,31 @@ impl<'a, 'b, DB: Database> StorageWriter<'a, 'b, DB> {
|
||||
Self::new(None, Some(static_file_writer))
|
||||
}
|
||||
|
||||
/// Creates a new instance of [`StorageWriter`] from a read-only database provider.
|
||||
pub const fn from_database_provider_ro<DB>(
|
||||
database: &'a DatabaseProviderRO<DB>,
|
||||
) -> StorageWriter<'_, '_, <DB as Database>::TX>
|
||||
where
|
||||
DB: Database,
|
||||
{
|
||||
StorageWriter::new(Some(database), None)
|
||||
}
|
||||
|
||||
/// Creates a new instance of [`StorageWriter`] from a read-write database provider.
|
||||
pub fn from_database_provider_rw<DB>(
|
||||
database: &'a DatabaseProviderRW<DB>,
|
||||
) -> StorageWriter<'_, '_, <DB as Database>::TXMut>
|
||||
where
|
||||
DB: Database,
|
||||
{
|
||||
StorageWriter::new(Some(database), None)
|
||||
}
|
||||
|
||||
/// Returns a reference to the database writer.
|
||||
///
|
||||
/// # Panics
|
||||
/// If the database writer is not set.
|
||||
fn database_writer(&self) -> &DatabaseProviderRW<DB> {
|
||||
fn database_writer(&self) -> &DatabaseProvider<TX> {
|
||||
self.database_writer.as_ref().expect("should exist")
|
||||
}
|
||||
|
||||
@@ -102,50 +117,12 @@ impl<'a, 'b, DB: Database> StorageWriter<'a, 'b, DB> {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Writes the hashed state changes to the database
|
||||
pub fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
|
||||
self.ensure_database_writer()?;
|
||||
|
||||
// Write hashed account updates.
|
||||
let mut hashed_accounts_cursor =
|
||||
self.database_writer().tx_ref().cursor_write::<tables::HashedAccounts>()?;
|
||||
for (hashed_address, account) in hashed_state.accounts().accounts_sorted() {
|
||||
if let Some(account) = account {
|
||||
hashed_accounts_cursor.upsert(hashed_address, account)?;
|
||||
} else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
|
||||
hashed_accounts_cursor.delete_current()?;
|
||||
}
|
||||
}
|
||||
|
||||
// Write hashed storage changes.
|
||||
let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
|
||||
let mut hashed_storage_cursor =
|
||||
self.database_writer().tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
|
||||
for (hashed_address, storage) in sorted_storages {
|
||||
if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
|
||||
hashed_storage_cursor.delete_current_duplicates()?;
|
||||
}
|
||||
|
||||
for (hashed_slot, value) in storage.storage_slots_sorted() {
|
||||
let entry = StorageEntry { key: hashed_slot, value };
|
||||
if let Some(db_entry) =
|
||||
hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)?
|
||||
{
|
||||
if db_entry.key == entry.key {
|
||||
hashed_storage_cursor.delete_current()?;
|
||||
}
|
||||
}
|
||||
|
||||
if !entry.value.is_zero() {
|
||||
hashed_storage_cursor.upsert(*hashed_address, entry)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl<'a, 'b, TX> StorageWriter<'a, 'b, TX>
|
||||
where
|
||||
TX: DbTx,
|
||||
{
|
||||
/// Appends headers to static files, using the
|
||||
/// [`HeaderTerminalDifficulties`](tables::HeaderTerminalDifficulties) table to determine the
|
||||
/// total difficulty of the parent block during header insertion.
|
||||
@@ -236,6 +213,54 @@ impl<'a, 'b, DB: Database> StorageWriter<'a, 'b, DB> {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, 'b, TX> StorageWriter<'a, 'b, TX>
|
||||
where
|
||||
TX: DbTxMut + DbTx,
|
||||
{
|
||||
/// Writes the hashed state changes to the database
|
||||
pub fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
|
||||
self.ensure_database_writer()?;
|
||||
|
||||
// Write hashed account updates.
|
||||
let mut hashed_accounts_cursor =
|
||||
self.database_writer().tx_ref().cursor_write::<tables::HashedAccounts>()?;
|
||||
for (hashed_address, account) in hashed_state.accounts().accounts_sorted() {
|
||||
if let Some(account) = account {
|
||||
hashed_accounts_cursor.upsert(hashed_address, account)?;
|
||||
} else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
|
||||
hashed_accounts_cursor.delete_current()?;
|
||||
}
|
||||
}
|
||||
|
||||
// Write hashed storage changes.
|
||||
let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
|
||||
let mut hashed_storage_cursor =
|
||||
self.database_writer().tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
|
||||
for (hashed_address, storage) in sorted_storages {
|
||||
if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
|
||||
hashed_storage_cursor.delete_current_duplicates()?;
|
||||
}
|
||||
|
||||
for (hashed_slot, value) in storage.storage_slots_sorted() {
|
||||
let entry = StorageEntry { key: hashed_slot, value };
|
||||
if let Some(db_entry) =
|
||||
hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)?
|
||||
{
|
||||
if db_entry.key == entry.key {
|
||||
hashed_storage_cursor.delete_current()?;
|
||||
}
|
||||
}
|
||||
|
||||
if !entry.value.is_zero() {
|
||||
hashed_storage_cursor.upsert(*hashed_address, entry)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Appends receipts block by block.
|
||||
///
|
||||
@@ -322,7 +347,10 @@ impl<'a, 'b, DB: Database> StorageWriter<'a, 'b, DB> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, 'b, DB: Database> StateWriter for StorageWriter<'a, 'b, DB> {
|
||||
impl<'a, 'b, TX> StateWriter for StorageWriter<'a, 'b, TX>
|
||||
where
|
||||
TX: DbTxMut + DbTx,
|
||||
{
|
||||
/// Write the data and receipts to the database or static files if `static_file_producer` is
|
||||
/// `Some`. It should be `None` if there is any kind of pruning/filtering over the receipts.
|
||||
fn write_to_storage(
|
||||
|
||||
Reference in New Issue
Block a user