mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-09 07:17:56 -05:00
chore(db): Simplifications to trie-related storage-api methods (#18579)
This commit is contained in:
@@ -82,7 +82,6 @@ where
|
||||
vec![block.clone()],
|
||||
&execution_outcome,
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
)?;
|
||||
provider_rw.commit()?;
|
||||
|
||||
@@ -216,7 +215,6 @@ where
|
||||
vec![block1.clone(), block2.clone()],
|
||||
&execution_outcome,
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
)?;
|
||||
provider_rw.commit()?;
|
||||
|
||||
|
||||
@@ -408,7 +408,7 @@ mod tests {
|
||||
use reth_provider::{BlockWriter, ProviderFactory, StaticFileProviderFactory};
|
||||
use reth_stages_api::StageUnitCheckpoint;
|
||||
use reth_testing_utils::generators::{self, random_header, random_header_range};
|
||||
use reth_trie::{updates::TrieUpdates, HashedPostStateSorted};
|
||||
use reth_trie::HashedPostStateSorted;
|
||||
use std::sync::Arc;
|
||||
use test_runner::HeadersTestRunner;
|
||||
|
||||
@@ -651,7 +651,6 @@ mod tests {
|
||||
sealed_blocks,
|
||||
&ExecutionOutcome::default(),
|
||||
HashedPostStateSorted::default(),
|
||||
TrieUpdates::default(),
|
||||
)
|
||||
.unwrap();
|
||||
provider.commit().unwrap();
|
||||
|
||||
@@ -1702,7 +1702,6 @@ mod tests {
|
||||
..Default::default()
|
||||
},
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
)?;
|
||||
provider_rw.commit()?;
|
||||
|
||||
|
||||
@@ -1769,7 +1769,6 @@ mod tests {
|
||||
..Default::default()
|
||||
},
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
)?;
|
||||
provider_rw.commit()?;
|
||||
|
||||
|
||||
@@ -36,9 +36,6 @@ impl DurationsRecorder {
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub(crate) enum Action {
|
||||
InsertStorageHashing,
|
||||
InsertAccountHashing,
|
||||
InsertMerkleTree,
|
||||
InsertBlock,
|
||||
InsertState,
|
||||
InsertHashes,
|
||||
@@ -58,12 +55,6 @@ pub(crate) enum Action {
|
||||
#[derive(Metrics)]
|
||||
#[metrics(scope = "storage.providers.database")]
|
||||
struct DatabaseProviderMetrics {
|
||||
/// Duration of insert storage hashing
|
||||
insert_storage_hashing: Histogram,
|
||||
/// Duration of insert account hashing
|
||||
insert_account_hashing: Histogram,
|
||||
/// Duration of insert merkle tree
|
||||
insert_merkle_tree: Histogram,
|
||||
/// Duration of insert block
|
||||
insert_block: Histogram,
|
||||
/// Duration of insert state
|
||||
@@ -96,9 +87,6 @@ impl DatabaseProviderMetrics {
|
||||
/// Records the duration for the given action.
|
||||
pub(crate) fn record_duration(&self, action: Action, duration: Duration) {
|
||||
match action {
|
||||
Action::InsertStorageHashing => self.insert_storage_hashing.record(duration),
|
||||
Action::InsertAccountHashing => self.insert_account_hashing.record(duration),
|
||||
Action::InsertMerkleTree => self.insert_merkle_tree.record(duration),
|
||||
Action::InsertBlock => self.insert_block.record(duration),
|
||||
Action::InsertState => self.insert_state.record(duration),
|
||||
Action::InsertHashes => self.insert_hashes.record(duration),
|
||||
|
||||
@@ -2336,7 +2336,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider
|
||||
}
|
||||
}
|
||||
|
||||
num_entries += self.write_storage_trie_updates(trie_updates.storage_tries_ref())?;
|
||||
num_entries += self.write_storage_trie_updates(trie_updates.storage_tries_ref().iter())?;
|
||||
|
||||
Ok(num_entries)
|
||||
}
|
||||
@@ -2345,12 +2345,12 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider
|
||||
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
|
||||
/// Writes storage trie updates from the given storage trie map. First sorts the storage trie
|
||||
/// updates by the hashed address, writing in sorted order.
|
||||
fn write_storage_trie_updates(
|
||||
fn write_storage_trie_updates<'a>(
|
||||
&self,
|
||||
storage_tries: &B256Map<StorageTrieUpdates>,
|
||||
storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdates)>,
|
||||
) -> ProviderResult<usize> {
|
||||
let mut num_entries = 0;
|
||||
let mut storage_tries = Vec::from_iter(storage_tries);
|
||||
let mut storage_tries = storage_tries.collect::<Vec<_>>();
|
||||
storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
|
||||
let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
|
||||
for (hashed_address, storage_trie_updates) in storage_tries {
|
||||
@@ -2363,20 +2363,6 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseP
|
||||
|
||||
Ok(num_entries)
|
||||
}
|
||||
|
||||
fn write_individual_storage_trie_updates(
|
||||
&self,
|
||||
hashed_address: B256,
|
||||
updates: &StorageTrieUpdates,
|
||||
) -> ProviderResult<usize> {
|
||||
if updates.is_empty() {
|
||||
return Ok(0)
|
||||
}
|
||||
|
||||
let cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
|
||||
let mut trie_db_cursor = DatabaseStorageTrieCursor::new(cursor, hashed_address);
|
||||
Ok(trie_db_cursor.write_storage_trie_updates(updates)?)
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
|
||||
@@ -2526,82 +2512,6 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvi
|
||||
|
||||
Ok(hashed_storage_keys)
|
||||
}
|
||||
|
||||
fn insert_hashes(
|
||||
&self,
|
||||
range: RangeInclusive<BlockNumber>,
|
||||
end_block_hash: B256,
|
||||
expected_state_root: B256,
|
||||
) -> ProviderResult<()> {
|
||||
// Initialize prefix sets.
|
||||
let mut account_prefix_set = PrefixSetMut::default();
|
||||
let mut storage_prefix_sets: HashMap<B256, PrefixSetMut> = HashMap::default();
|
||||
let mut destroyed_accounts = HashSet::default();
|
||||
|
||||
let mut durations_recorder = metrics::DurationsRecorder::default();
|
||||
|
||||
// storage hashing stage
|
||||
{
|
||||
let lists = self.changed_storages_with_range(range.clone())?;
|
||||
let storages = self.plain_state_storages(lists)?;
|
||||
let storage_entries = self.insert_storage_for_hashing(storages)?;
|
||||
for (hashed_address, hashed_slots) in storage_entries {
|
||||
account_prefix_set.insert(Nibbles::unpack(hashed_address));
|
||||
for slot in hashed_slots {
|
||||
storage_prefix_sets
|
||||
.entry(hashed_address)
|
||||
.or_default()
|
||||
.insert(Nibbles::unpack(slot));
|
||||
}
|
||||
}
|
||||
}
|
||||
durations_recorder.record_relative(metrics::Action::InsertStorageHashing);
|
||||
|
||||
// account hashing stage
|
||||
{
|
||||
let lists = self.changed_accounts_with_range(range.clone())?;
|
||||
let accounts = self.basic_accounts(lists)?;
|
||||
let hashed_addresses = self.insert_account_for_hashing(accounts)?;
|
||||
for (hashed_address, account) in hashed_addresses {
|
||||
account_prefix_set.insert(Nibbles::unpack(hashed_address));
|
||||
if account.is_none() {
|
||||
destroyed_accounts.insert(hashed_address);
|
||||
}
|
||||
}
|
||||
}
|
||||
durations_recorder.record_relative(metrics::Action::InsertAccountHashing);
|
||||
|
||||
// merkle tree
|
||||
{
|
||||
// This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
|
||||
// are pre-loaded.
|
||||
let prefix_sets = TriePrefixSets {
|
||||
account_prefix_set: account_prefix_set.freeze(),
|
||||
storage_prefix_sets: storage_prefix_sets
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k, v.freeze()))
|
||||
.collect(),
|
||||
destroyed_accounts,
|
||||
};
|
||||
let (state_root, trie_updates) = StateRoot::from_tx(&self.tx)
|
||||
.with_prefix_sets(prefix_sets)
|
||||
.root_with_updates()
|
||||
.map_err(reth_db_api::DatabaseError::from)?;
|
||||
if state_root != expected_state_root {
|
||||
return Err(ProviderError::StateRootMismatch(Box::new(RootMismatch {
|
||||
root: GotExpected { got: state_root, expected: expected_state_root },
|
||||
block_number: *range.end(),
|
||||
block_hash: end_block_hash,
|
||||
})))
|
||||
}
|
||||
self.write_trie_updates(&trie_updates)?;
|
||||
}
|
||||
durations_recorder.record_relative(metrics::Action::InsertMerkleTree);
|
||||
|
||||
debug!(target: "providers::db", ?range, actions = ?durations_recorder.actions, "Inserted hashes");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
|
||||
@@ -3048,7 +2958,6 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWrite
|
||||
blocks: Vec<RecoveredBlock<Self::Block>>,
|
||||
execution_outcome: &ExecutionOutcome<Self::Receipt>,
|
||||
hashed_state: HashedPostStateSorted,
|
||||
trie_updates: TrieUpdates,
|
||||
) -> ProviderResult<()> {
|
||||
if blocks.is_empty() {
|
||||
debug!(target: "providers::db", "Attempted to append empty block range");
|
||||
@@ -3076,7 +2985,6 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWrite
|
||||
|
||||
// insert hashes and intermediate merkle nodes
|
||||
self.write_hashed_state(&hashed_state)?;
|
||||
self.write_trie_updates(&trie_updates)?;
|
||||
durations_recorder.record_relative(metrics::Action::InsertHashes);
|
||||
|
||||
self.update_history_indices(first_number..=last_block_number)?;
|
||||
|
||||
@@ -1354,7 +1354,7 @@ mod tests {
|
||||
assert_eq!(storage_root, storage_root_prehashed(init_storage.storage));
|
||||
assert!(!storage_updates.is_empty());
|
||||
provider_rw
|
||||
.write_individual_storage_trie_updates(hashed_address, &storage_updates)
|
||||
.write_storage_trie_updates(core::iter::once((&hashed_address, &storage_updates)))
|
||||
.unwrap();
|
||||
|
||||
// destroy the storage and re-create with new slots
|
||||
|
||||
@@ -5,7 +5,7 @@ use reth_db_models::StoredBlockBodyIndices;
|
||||
use reth_execution_types::{Chain, ExecutionOutcome};
|
||||
use reth_primitives_traits::{Block, NodePrimitives, RecoveredBlock};
|
||||
use reth_storage_errors::provider::ProviderResult;
|
||||
use reth_trie_common::{updates::TrieUpdates, HashedPostStateSorted};
|
||||
use reth_trie_common::HashedPostStateSorted;
|
||||
|
||||
/// `BlockExecution` Writer
|
||||
pub trait BlockExecutionWriter:
|
||||
@@ -107,7 +107,7 @@ pub trait BlockWriter: Send + Sync {
|
||||
/// updates the post-state.
|
||||
///
|
||||
/// Inserts the blocks into the database and updates the state with
|
||||
/// provided `BundleState`.
|
||||
/// provided `BundleState`. The database's trie state is _not_ updated.
|
||||
///
|
||||
/// # Parameters
|
||||
///
|
||||
@@ -122,6 +122,5 @@ pub trait BlockWriter: Send + Sync {
|
||||
blocks: Vec<RecoveredBlock<Self::Block>>,
|
||||
execution_outcome: &ExecutionOutcome<Self::Receipt>,
|
||||
hashed_state: HashedPostStateSorted,
|
||||
trie_updates: TrieUpdates,
|
||||
) -> ProviderResult<()>;
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use alloc::collections::{BTreeMap, BTreeSet};
|
||||
use alloy_primitives::{map::HashMap, Address, BlockNumber, B256};
|
||||
use auto_impl::auto_impl;
|
||||
use core::ops::{RangeBounds, RangeInclusive};
|
||||
use core::ops::RangeBounds;
|
||||
use reth_db_api::models::BlockNumberAddress;
|
||||
use reth_db_models::AccountBeforeTx;
|
||||
use reth_primitives_traits::{Account, StorageEntry};
|
||||
@@ -69,17 +69,4 @@ pub trait HashingWriter: Send + Sync {
|
||||
&self,
|
||||
storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
|
||||
) -> ProviderResult<HashMap<B256, BTreeSet<B256>>>;
|
||||
|
||||
/// Calculate the hashes of all changed accounts and storages, and finally calculate the state
|
||||
/// root.
|
||||
///
|
||||
/// The hashes are calculated from `fork_block_number + 1` to `current_block_number`.
|
||||
///
|
||||
/// The resulting state root is compared with `expected_state_root`.
|
||||
fn insert_hashes(
|
||||
&self,
|
||||
range: RangeInclusive<BlockNumber>,
|
||||
end_block_hash: B256,
|
||||
expected_state_root: B256,
|
||||
) -> ProviderResult<()>;
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use alloc::vec::Vec;
|
||||
use alloy_primitives::{map::B256Map, Address, Bytes, B256};
|
||||
use alloy_primitives::{Address, Bytes, B256};
|
||||
use reth_storage_errors::provider::ProviderResult;
|
||||
use reth_trie_common::{
|
||||
updates::{StorageTrieUpdates, TrieUpdates},
|
||||
@@ -106,15 +106,8 @@ pub trait StorageTrieWriter: Send + Sync {
|
||||
/// First sorts the storage trie updates by the hashed address key, writing in sorted order.
|
||||
///
|
||||
/// Returns the number of entries modified.
|
||||
fn write_storage_trie_updates(
|
||||
fn write_storage_trie_updates<'a>(
|
||||
&self,
|
||||
storage_tries: &B256Map<StorageTrieUpdates>,
|
||||
) -> ProviderResult<usize>;
|
||||
|
||||
/// Writes storage trie updates for the given hashed address.
|
||||
fn write_individual_storage_trie_updates(
|
||||
&self,
|
||||
hashed_address: B256,
|
||||
updates: &StorageTrieUpdates,
|
||||
storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdates)>,
|
||||
) -> ProviderResult<usize>;
|
||||
}
|
||||
|
||||
@@ -81,7 +81,7 @@ fn incremental_vs_full_root(inputs: &[&str], modified: &str) {
|
||||
let modified_root = loader.root().unwrap();
|
||||
|
||||
// Update the intermediate roots table so that we can run the incremental verification
|
||||
tx.write_individual_storage_trie_updates(hashed_address, &trie_updates).unwrap();
|
||||
tx.write_storage_trie_updates(core::iter::once((&hashed_address, &trie_updates))).unwrap();
|
||||
|
||||
// 3. Calculate the incremental root
|
||||
let mut storage_changes = PrefixSetMut::default();
|
||||
|
||||
Reference in New Issue
Block a user