chore(trie): Use trie changesets for engine unwinding (#18878)

This commit is contained in:
Brian Picciano
2025-10-07 17:59:42 +02:00
committed by GitHub
parent 7381462e84
commit c4e174673d
15 changed files with 348 additions and 208 deletions

View File

@@ -15,10 +15,7 @@ use reth_db::DatabaseEnv;
use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
use reth_evm::ConfigureEvm;
use reth_exex::ExExManagerHandle;
use reth_provider::{
providers::ProviderNodeTypes, BlockExecutionWriter, BlockNumReader, ChainStateBlockReader,
ChainStateBlockWriter, ProviderFactory, StaticFileProviderFactory,
};
use reth_provider::{providers::ProviderNodeTypes, BlockNumReader, ProviderFactory};
use reth_stages::{
sets::{DefaultStages, OfflineStages},
stages::ExecutionStage,
@@ -60,55 +57,22 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
let components = components(provider_factory.chain_spec());
let highest_static_file_block = provider_factory
.static_file_provider()
.get_highest_static_files()
.max_block_num()
.filter(|highest_static_file_block| *highest_static_file_block > target);
// Execute a pipeline unwind if the start of the range overlaps the existing static
// files. If that's the case, then copy all available data from MDBX to static files, and
// only then, proceed with the unwind.
//
// We also execute a pipeline unwind if `offline` is specified, because we need to only
// unwind the data associated with offline stages.
if highest_static_file_block.is_some() || self.offline {
if self.offline {
info!(target: "reth::cli", "Performing an unwind for offline-only data!");
}
if let Some(highest_static_file_block) = highest_static_file_block {
info!(target: "reth::cli", ?target, ?highest_static_file_block, "Executing a pipeline unwind.");
} else {
info!(target: "reth::cli", ?target, "Executing a pipeline unwind.");
}
info!(target: "reth::cli", prune_config=?config.prune, "Using prune settings");
// This will build an offline-only pipeline if the `offline` flag is enabled
let mut pipeline =
self.build_pipeline(config, provider_factory, components.evm_config().clone())?;
// Move all applicable data from database to static files.
pipeline.move_to_static_files()?;
pipeline.unwind(target, None)?;
} else {
info!(target: "reth::cli", ?target, "Executing a database unwind.");
let provider = provider_factory.provider_rw()?;
provider
.remove_block_and_execution_above(target)
.map_err(|err| eyre::eyre!("Transaction error on unwind: {err}"))?;
// update finalized block if needed
let last_saved_finalized_block_number = provider.last_finalized_block_number()?;
if last_saved_finalized_block_number.is_none_or(|f| f > target) {
provider.save_finalized_block_number(target)?;
}
provider.commit()?;
if self.offline {
info!(target: "reth::cli", "Performing an unwind for offline-only data!");
}
let highest_static_file_block = provider_factory.provider()?.last_block_number()?;
info!(target: "reth::cli", ?target, ?highest_static_file_block, prune_config=?config.prune, "Executing a pipeline unwind.");
// This will build an offline-only pipeline if the `offline` flag is enabled
let mut pipeline =
self.build_pipeline(config, provider_factory, components.evm_config().clone())?;
// Move all applicable data from database to static files.
pipeline.move_to_static_files()?;
pipeline.unwind(target, None)?;
info!(target: "reth::cli", ?target, "Unwound blocks");
Ok(())

View File

@@ -91,7 +91,6 @@ async fn e2e_test_send_transactions() -> eyre::Result<()> {
Ok(())
}
#[ignore] // TODO(mediocregopher): re-enable as part of https://github.com/paradigmxyz/reth/issues/18517
#[tokio::test]
async fn test_long_reorg() -> eyre::Result<()> {
reth_tracing::init_test_tracing();

View File

@@ -165,7 +165,7 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> TestStageDB {
db.insert_changesets(transitions, None).unwrap();
let provider_rw = db.factory.provider_rw().unwrap();
provider_rw.write_trie_updates(&updates).unwrap();
provider_rw.write_trie_updates(updates).unwrap();
provider_rw.commit().unwrap();
let (transitions, final_state) = random_changeset_range(

View File

@@ -247,7 +247,7 @@ where
})?;
match progress {
StateRootProgress::Progress(state, hashed_entries_walked, updates) => {
provider.write_trie_updates(&updates)?;
provider.write_trie_updates(updates)?;
let mut checkpoint = MerkleCheckpoint::new(
to_block,
@@ -290,7 +290,7 @@ where
})
}
StateRootProgress::Complete(root, hashed_entries_walked, updates) => {
provider.write_trie_updates(&updates)?;
provider.write_trie_updates(updates)?;
entities_checkpoint.processed += hashed_entries_walked as u64;
@@ -317,7 +317,7 @@ where
error!(target: "sync::stages::merkle", %e, ?current_block_number, ?to_block, "Incremental state root failed! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
StageError::Fatal(Box::new(e))
})?;
provider.write_trie_updates(&updates)?;
provider.write_trie_updates(updates)?;
final_root = Some(root);
}
@@ -400,7 +400,7 @@ where
validate_state_root(block_root, SealedHeader::seal_slow(target), input.unwind_to)?;
// Validation passed, apply unwind changes to the database.
provider.write_trie_updates(&updates)?;
provider.write_trie_updates(updates)?;
// Update entities checkpoint to reflect the unwind operation
// Since we're unwinding, we need to recalculate the total entities at the target block

View File

@@ -602,7 +602,7 @@ where
match state_root.root_with_progress()? {
StateRootProgress::Progress(state, _, updates) => {
let updated_len = provider.write_trie_updates(&updates)?;
let updated_len = provider.write_trie_updates(updates)?;
total_flushed_updates += updated_len;
trace!(target: "reth::cli",
@@ -622,7 +622,7 @@ where
}
}
StateRootProgress::Complete(root, _, updates) => {
let updated_len = provider.write_trie_updates(&updates)?;
let updated_len = provider.write_trie_updates(updates)?;
total_flushed_updates += updated_len;
trace!(target: "reth::cli",

View File

@@ -49,8 +49,7 @@ use reth_db_api::{
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
use reth_primitives_traits::{
Account, Block as _, BlockBody as _, Bytecode, GotExpected, RecoveredBlock, SealedHeader,
StorageEntry,
Account, Block as _, BlockBody as _, Bytecode, RecoveredBlock, SealedHeader, StorageEntry,
};
use reth_prune_types::{
PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE,
@@ -61,15 +60,14 @@ use reth_storage_api::{
BlockBodyIndicesProvider, BlockBodyReader, NodePrimitivesProvider, StateProvider,
StorageChangeSetReader, TryIntoHistoricalStateProvider,
};
use reth_storage_errors::provider::{ProviderResult, RootMismatch};
use reth_storage_errors::provider::ProviderResult;
use reth_trie::{
prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets},
trie_cursor::{InMemoryTrieCursor, TrieCursor, TrieCursorIter},
updates::{StorageTrieUpdates, StorageTrieUpdatesSorted, TrieUpdates, TrieUpdatesSorted},
BranchNodeCompact, HashedPostStateSorted, Nibbles, StateRoot, StoredNibbles,
StoredNibblesSubKey, TrieChangeSetsEntry,
updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
BranchNodeCompact, HashedPostStateSorted, Nibbles, StoredNibbles, StoredNibblesSubKey,
TrieChangeSetsEntry,
};
use reth_trie_db::{DatabaseAccountTrieCursor, DatabaseStateRoot, DatabaseStorageTrieCursor};
use reth_trie_db::{DatabaseAccountTrieCursor, DatabaseStorageTrieCursor};
use revm_database::states::{
PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
};
@@ -305,13 +303,9 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
trie.take_present().ok_or(ProviderError::MissingTrieUpdates(block_hash))?;
// sort trie updates and insert changesets
// TODO(mediocregopher): We should rework `write_trie_updates` to also accept a
// `TrieUpdatesSorted`, and then the `trie` field of `ExecutedBlockWithTrieUpdates` to
// carry a TrieUpdatesSorted.
let trie_updates_sorted = (*trie_updates).clone().into_sorted();
self.write_trie_changesets(block_number, &trie_updates_sorted, None)?;
self.write_trie_updates(&trie_updates)?;
self.write_trie_updates_sorted(&trie_updates_sorted)?;
}
// update history indices
@@ -325,89 +319,42 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
Ok(())
}
/// Unwinds trie state for the given range.
/// Unwinds trie state starting at and including the given block.
///
/// This includes calculating the resulted state root and comparing it with the parent block
/// state root.
pub fn unwind_trie_state_range(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
pub fn unwind_trie_state_from(&self, from: BlockNumber) -> ProviderResult<()> {
let changed_accounts = self
.tx
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(range.clone())?
.walk_range(from..)?
.collect::<Result<Vec<_>, _>>()?;
// Unwind account hashes. Add changed accounts to account prefix set.
let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?;
let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
let mut destroyed_accounts = HashSet::default();
for (hashed_address, account) in hashed_addresses {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
if account.is_none() {
destroyed_accounts.insert(hashed_address);
}
}
// Unwind account hashes.
self.unwind_account_hashing(changed_accounts.iter())?;
// Unwind account history indices.
self.unwind_account_history_indices(changed_accounts.iter())?;
let storage_range = BlockNumberAddress::range(range.clone());
let storage_start = BlockNumberAddress((from, Address::ZERO));
let changed_storages = self
.tx
.cursor_read::<tables::StorageChangeSets>()?
.walk_range(storage_range)?
.walk_range(storage_start..)?
.collect::<Result<Vec<_>, _>>()?;
// Unwind storage hashes. Add changed account and storage keys to corresponding prefix
// sets.
let mut storage_prefix_sets = B256Map::<PrefixSet>::default();
let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?;
for (hashed_address, hashed_slots) in storage_entries {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
for slot in hashed_slots {
storage_prefix_set.insert(Nibbles::unpack(slot));
}
storage_prefix_sets.insert(hashed_address, storage_prefix_set.freeze());
}
// Unwind storage hashes.
self.unwind_storage_hashing(changed_storages.iter().copied())?;
// Unwind storage history indices.
self.unwind_storage_history_indices(changed_storages.iter().copied())?;
// Calculate the reverted merkle root.
// This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
// are pre-loaded.
let prefix_sets = TriePrefixSets {
account_prefix_set: account_prefix_set.freeze(),
storage_prefix_sets,
destroyed_accounts,
};
let (new_state_root, trie_updates) = StateRoot::from_tx(&self.tx)
.with_prefix_sets(prefix_sets)
.root_with_updates()
.map_err(reth_db_api::DatabaseError::from)?;
// Unwind accounts/storages trie tables using the revert.
let trie_revert = self.trie_reverts(from)?;
self.write_trie_updates_sorted(&trie_revert)?;
let parent_number = range.start().saturating_sub(1);
let parent_state_root = self
.header_by_number(parent_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?
.state_root();
// state root should be always correct as we are reverting state.
// but for sake of double verification we will check it again.
if new_state_root != parent_state_root {
let parent_hash = self
.block_hash(parent_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
return Err(ProviderError::UnwindStateRootMismatch(Box::new(RootMismatch {
root: GotExpected { got: new_state_root, expected: parent_state_root },
block_number: parent_number,
block_hash: parent_hash,
})))
}
self.write_trie_updates(&trie_updates)?;
// Clear trie changesets which have been unwound.
self.clear_trie_changesets_from(from)?;
Ok(())
}
@@ -2150,8 +2097,10 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
}
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
/// Writes trie updates. Returns the number of entries modified.
fn write_trie_updates(&self, trie_updates: &TrieUpdates) -> ProviderResult<usize> {
/// Writes trie updates to the database with already sorted updates.
///
/// Returns the number of entries modified.
fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
if trie_updates.is_empty() {
return Ok(0)
}
@@ -2159,23 +2108,11 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider
// Track the number of inserted entries.
let mut num_entries = 0;
// Merge updated and removed nodes. Updated nodes must take precedence.
let mut account_updates = trie_updates
.removed_nodes_ref()
.iter()
.filter_map(|n| {
(!trie_updates.account_nodes_ref().contains_key(n)).then_some((n, None))
})
.collect::<Vec<_>>();
account_updates.extend(
trie_updates.account_nodes_ref().iter().map(|(nibbles, node)| (nibbles, Some(node))),
);
// Sort trie node updates.
account_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
let tx = self.tx_ref();
let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
for (key, updated_node) in account_updates {
// Process sorted account nodes
for (key, updated_node) in &trie_updates.account_nodes {
let nibbles = StoredNibbles(*key);
match updated_node {
Some(node) => {
@@ -2193,16 +2130,17 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider
}
}
num_entries += self.write_storage_trie_updates(trie_updates.storage_tries_ref().iter())?;
num_entries +=
self.write_storage_trie_updates_sorted(trie_updates.storage_tries_ref().iter())?;
Ok(num_entries)
}
/// Records the current values of all trie nodes which will be updated using the [`TrieUpdates`]
/// Records the current values of all trie nodes which will be updated using the `TrieUpdates`
/// into the trie changesets tables.
///
/// The intended usage of this method is to call it _prior_ to calling `write_trie_updates` with
/// the same [`TrieUpdates`].
/// the same `TrieUpdates`.
///
/// Returns the number of keys written.
fn write_trie_changesets(
@@ -2309,9 +2247,11 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieReader for DatabaseProvider
let mut storage_tries = B256Map::<Vec<_>>::default();
let mut seen_storage_keys = HashSet::new();
let mut storages_cursor = tx.cursor_dup_read::<tables::StoragesTrieChangeSets>()?;
let storage_range = BlockNumberHashedAddress((from, B256::ZERO))..;
for entry in storages_cursor.walk_range(storage_range)? {
// Create storage range starting from `from` block
let storage_range_start = BlockNumberHashedAddress((from, B256::ZERO));
for entry in storages_cursor.walk_range(storage_range_start..)? {
let (
BlockNumberHashedAddress((_, hashed_address)),
TrieChangeSetsEntry { nibbles, node },
@@ -2336,14 +2276,14 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieReader for DatabaseProvider
}
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
/// Writes storage trie updates from the given storage trie map.
/// Writes storage trie updates from the given storage trie map with already sorted updates.
///
/// First sorts the storage trie updates by the hashed address key, writing in sorted order.
/// Expects the storage trie updates to already be sorted by the hashed address key.
///
/// Returns the number of entries modified.
fn write_storage_trie_updates<'a>(
fn write_storage_trie_updates_sorted<'a>(
&self,
storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdates)>,
storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
) -> ProviderResult<usize> {
let mut num_entries = 0;
let mut storage_tries = storage_tries.collect::<Vec<_>>();
@@ -2353,7 +2293,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseP
let mut db_storage_trie_cursor =
DatabaseStorageTrieCursor::new(cursor, *hashed_address);
num_entries +=
db_storage_trie_cursor.write_storage_trie_updates(storage_trie_updates)?;
db_storage_trie_cursor.write_storage_trie_updates_sorted(storage_trie_updates)?;
cursor = db_storage_trie_cursor.cursor;
}
@@ -2361,10 +2301,10 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseP
}
/// Records the current values of all trie nodes which will be updated using the
/// [`StorageTrieUpdates`] into the storage trie changesets table.
/// `StorageTrieUpdates` into the storage trie changesets table.
///
/// The intended usage of this method is to call it _prior_ to calling
/// `write_storage_trie_updates` with the same set of [`StorageTrieUpdates`].
/// `write_storage_trie_updates` with the same set of `StorageTrieUpdates`.
///
/// Returns the number of keys written.
fn write_storage_trie_changesets<'a>(
@@ -2751,7 +2691,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecu
) -> ProviderResult<Chain<Self::Primitives>> {
let range = block + 1..=self.last_block_number()?;
self.unwind_trie_state_range(range.clone())?;
self.unwind_trie_state_from(block + 1)?;
// get execution res
let execution_state = self.take_state_above(block)?;
@@ -2769,9 +2709,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecu
}
fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
let range = block + 1..=self.last_block_number()?;
self.unwind_trie_state_range(range)?;
self.unwind_trie_state_from(block + 1)?;
// remove execution res
self.remove_state_above(block)?;
@@ -4163,4 +4101,224 @@ mod tests {
assert!(block5_entries.is_empty(), "Block 104 storage entries should be deleted");
}
}
#[test]
fn test_write_trie_updates_sorted() {
use reth_trie::{
updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
BranchNodeCompact, StorageTrieEntry,
};
let factory = create_test_provider_factory();
let provider_rw = factory.provider_rw().unwrap();
// Pre-populate account trie with data that will be deleted
{
let tx = provider_rw.tx_ref();
let mut cursor = tx.cursor_write::<tables::AccountsTrie>().unwrap();
// Add account node that will be deleted
let to_delete = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
cursor
.upsert(
to_delete,
&BranchNodeCompact::new(
0b1010_1010_1010_1010, // state_mask
0b0000_0000_0000_0000, // tree_mask
0b0000_0000_0000_0000, // hash_mask
vec![],
None,
),
)
.unwrap();
// Add account node that will be updated
let to_update = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
cursor
.upsert(
to_update,
&BranchNodeCompact::new(
0b0101_0101_0101_0101, // old state_mask (will be updated)
0b0000_0000_0000_0000, // tree_mask
0b0000_0000_0000_0000, // hash_mask
vec![],
None,
),
)
.unwrap();
}
// Pre-populate storage tries with data
let storage_address1 = B256::from([1u8; 32]);
let storage_address2 = B256::from([2u8; 32]);
{
let tx = provider_rw.tx_ref();
let mut storage_cursor = tx.cursor_dup_write::<tables::StoragesTrie>().unwrap();
// Add storage nodes for address1 (one will be deleted)
storage_cursor
.upsert(
storage_address1,
&StorageTrieEntry {
nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0x2, 0x0])),
node: BranchNodeCompact::new(
0b0011_0011_0011_0011, // will be deleted
0b0000_0000_0000_0000,
0b0000_0000_0000_0000,
vec![],
None,
),
},
)
.unwrap();
// Add storage nodes for address2 (will be wiped)
storage_cursor
.upsert(
storage_address2,
&StorageTrieEntry {
nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xa, 0xb])),
node: BranchNodeCompact::new(
0b1100_1100_1100_1100, // will be wiped
0b0000_0000_0000_0000,
0b0000_0000_0000_0000,
vec![],
None,
),
},
)
.unwrap();
storage_cursor
.upsert(
storage_address2,
&StorageTrieEntry {
nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xc, 0xd])),
node: BranchNodeCompact::new(
0b0011_1100_0011_1100, // will be wiped
0b0000_0000_0000_0000,
0b0000_0000_0000_0000,
vec![],
None,
),
},
)
.unwrap();
}
// Create sorted account trie updates
let account_nodes = vec![
(
Nibbles::from_nibbles([0x1, 0x2]),
Some(BranchNodeCompact::new(
0b1111_1111_1111_1111, // state_mask (updated)
0b0000_0000_0000_0000, // tree_mask
0b0000_0000_0000_0000, // hash_mask (no hashes)
vec![],
None,
)),
),
(Nibbles::from_nibbles([0x3, 0x4]), None), // Deletion
(
Nibbles::from_nibbles([0x5, 0x6]),
Some(BranchNodeCompact::new(
0b1111_1111_1111_1111, // state_mask
0b0000_0000_0000_0000, // tree_mask
0b0000_0000_0000_0000, // hash_mask (no hashes)
vec![],
None,
)),
),
];
// Create sorted storage trie updates
let storage_trie1 = StorageTrieUpdatesSorted {
is_deleted: false,
storage_nodes: vec![
(
Nibbles::from_nibbles([0x1, 0x0]),
Some(BranchNodeCompact::new(
0b1111_0000_0000_0000, // state_mask
0b0000_0000_0000_0000, // tree_mask
0b0000_0000_0000_0000, // hash_mask (no hashes)
vec![],
None,
)),
),
(Nibbles::from_nibbles([0x2, 0x0]), None), // Deletion of existing node
],
};
let storage_trie2 = StorageTrieUpdatesSorted {
is_deleted: true, // Wipe all storage for this address
storage_nodes: vec![],
};
let mut storage_tries = B256Map::default();
storage_tries.insert(storage_address1, storage_trie1);
storage_tries.insert(storage_address2, storage_trie2);
let trie_updates = TrieUpdatesSorted { account_nodes, storage_tries };
// Write the sorted trie updates
let num_entries = provider_rw.write_trie_updates_sorted(&trie_updates).unwrap();
// We should have 2 account insertions + 1 account deletion + 1 storage insertion + 1
// storage deletion = 5
assert_eq!(num_entries, 5);
// Verify account trie updates were written correctly
let tx = provider_rw.tx_ref();
let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
// Check first account node was updated
let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
let entry1 = cursor.seek_exact(nibbles1).unwrap();
assert!(entry1.is_some(), "Updated account node should exist");
let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
assert_eq!(
entry1.unwrap().1.state_mask,
expected_mask,
"Account node should have updated state_mask"
);
// Check deleted account node no longer exists
let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
let entry2 = cursor.seek_exact(nibbles2).unwrap();
assert!(entry2.is_none(), "Deleted account node should not exist");
// Check new account node exists
let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
let entry3 = cursor.seek_exact(nibbles3).unwrap();
assert!(entry3.is_some(), "New account node should exist");
// Verify storage trie updates were written correctly
let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
// Check storage for address1
let storage_entries1: Vec<_> = storage_cursor
.walk_dup(Some(storage_address1), None)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(
storage_entries1.len(),
1,
"Storage address1 should have 1 entry after deletion"
);
assert_eq!(
storage_entries1[0].1.nibbles.0,
Nibbles::from_nibbles([0x1, 0x0]),
"Remaining entry should be [0x1, 0x0]"
);
// Check storage for address2 was wiped
let storage_entries2: Vec<_> = storage_cursor
.walk_dup(Some(storage_address2), None)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
provider_rw.commit().unwrap();
}
}

View File

@@ -89,7 +89,7 @@ pub fn insert_genesis<N: ProviderNodeTypes<ChainSpec = ChainSpec>>(
let (root, updates) = StateRoot::from_tx(provider.tx_ref())
.root_with_updates()
.map_err(reth_db::DatabaseError::from)?;
provider.write_trie_updates(&updates).unwrap();
provider.write_trie_updates(updates).unwrap();
provider.commit()?;

View File

@@ -909,7 +909,7 @@ mod tests {
}
let (_, updates) = StateRoot::from_tx(tx).root_with_updates().unwrap();
provider_rw.write_trie_updates(&updates).unwrap();
provider_rw.write_trie_updates(updates).unwrap();
let mut state = State::builder().with_bundle_update().build();
@@ -1127,7 +1127,10 @@ mod tests {
assert_eq!(storage_root, storage_root_prehashed(init_storage.storage));
assert!(!storage_updates.is_empty());
provider_rw
.write_storage_trie_updates(core::iter::once((&hashed_address, &storage_updates)))
.write_storage_trie_updates_sorted(core::iter::once((
&hashed_address,
&storage_updates.into_sorted(),
)))
.unwrap();
// destroy the storage and re-create with new slots

View File

@@ -2,7 +2,7 @@ use alloc::vec::Vec;
use alloy_primitives::{Address, BlockNumber, Bytes, B256};
use reth_storage_errors::provider::ProviderResult;
use reth_trie_common::{
updates::{StorageTrieUpdates, StorageTrieUpdatesSorted, TrieUpdates, TrieUpdatesSorted},
updates::{StorageTrieUpdatesSorted, TrieUpdates, TrieUpdatesSorted},
AccountProof, HashedPostState, HashedStorage, MultiProof, MultiProofTargets, StorageMultiProof,
StorageProof, TrieInput,
};
@@ -93,7 +93,7 @@ pub trait StateProofProvider: Send + Sync {
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait TrieReader: Send + Sync {
/// Returns the [`TrieUpdatesSorted`] for reverting the trie database to its state prior to the
/// given block having been processed.
/// given block and onwards having been processed.
fn trie_reverts(&self, from: BlockNumber) -> ProviderResult<TrieUpdatesSorted>;
}
@@ -103,7 +103,14 @@ pub trait TrieWriter: Send + Sync {
/// Writes trie updates to the database.
///
/// Returns the number of entries modified.
fn write_trie_updates(&self, trie_updates: &TrieUpdates) -> ProviderResult<usize>;
fn write_trie_updates(&self, trie_updates: TrieUpdates) -> ProviderResult<usize> {
self.write_trie_updates_sorted(&trie_updates.into_sorted())
}
/// Writes trie updates to the database with already sorted updates.
///
/// Returns the number of entries modified.
fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize>;
/// Records the current values of all trie nodes which will be updated using the [`TrieUpdates`]
/// into the trie changesets tables.
@@ -135,21 +142,21 @@ pub trait TrieWriter: Send + Sync {
/// Storage Trie Writer
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait StorageTrieWriter: Send + Sync {
/// Writes storage trie updates from the given storage trie map.
/// Writes storage trie updates from the given storage trie map with already sorted updates.
///
/// First sorts the storage trie updates by the hashed address key, writing in sorted order.
/// Expects the storage trie updates to already be sorted by the hashed address key.
///
/// Returns the number of entries modified.
fn write_storage_trie_updates<'a>(
fn write_storage_trie_updates_sorted<'a>(
&self,
storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdates)>,
storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
) -> ProviderResult<usize>;
/// Records the current values of all trie nodes which will be updated using the
/// [`StorageTrieUpdates`] into the storage trie changesets table.
/// [`StorageTrieUpdatesSorted`] into the storage trie changesets table.
///
/// The intended usage of this method is to call it _prior_ to calling
/// `write_storage_trie_updates` with the same set of [`StorageTrieUpdates`].
/// `write_storage_trie_updates` with the same set of [`StorageTrieUpdatesSorted`].
///
/// The `updates_overlay` parameter allows providing additional in-memory trie updates that
/// should be considered when looking up current node values. When provided, these overlay

View File

@@ -438,6 +438,11 @@ pub struct TrieUpdatesSorted {
}
impl TrieUpdatesSorted {
/// Returns `true` if the updates are empty.
pub fn is_empty(&self) -> bool {
self.account_nodes.is_empty() && self.storage_tries.is_empty()
}
/// Returns reference to updated account nodes.
pub fn account_nodes_ref(&self) -> &[(Nibbles, Option<BranchNodeCompact>)] {
&self.account_nodes

View File

@@ -7,7 +7,7 @@ use reth_db_api::{
};
use reth_trie::{
trie_cursor::{TrieCursor, TrieCursorFactory},
updates::StorageTrieUpdates,
updates::StorageTrieUpdatesSorted,
BranchNodeCompact, Nibbles, StorageTrieEntry, StoredNibbles, StoredNibblesSubKey,
};
@@ -110,31 +110,19 @@ where
+ DbDupCursorRO<tables::StoragesTrie>
+ DbDupCursorRW<tables::StoragesTrie>,
{
/// Writes storage updates
pub fn write_storage_trie_updates(
/// Writes storage updates that are already sorted
pub fn write_storage_trie_updates_sorted(
&mut self,
updates: &StorageTrieUpdates,
updates: &StorageTrieUpdatesSorted,
) -> Result<usize, DatabaseError> {
// The storage trie for this account has to be deleted.
if updates.is_deleted() && self.cursor.seek_exact(self.hashed_address)?.is_some() {
self.cursor.delete_current_duplicates()?;
}
// Merge updated and removed nodes. Updated nodes must take precedence.
let mut storage_updates = updates
.removed_nodes_ref()
.iter()
.filter_map(|n| (!updates.storage_nodes_ref().contains_key(n)).then_some((n, None)))
.collect::<Vec<_>>();
storage_updates.extend(
updates.storage_nodes_ref().iter().map(|(nibbles, node)| (nibbles, Some(node))),
);
// Sort trie node updates.
storage_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
let mut num_entries = 0;
for (nibbles, maybe_updated) in storage_updates.into_iter().filter(|(n, _)| !n.is_empty()) {
for (nibbles, maybe_updated) in updates.storage_nodes.iter().filter(|(n, _)| !n.is_empty())
{
num_entries += 1;
let nibbles = StoredNibblesSubKey(*nibbles);
// Delete the old entry if it exists.

View File

@@ -81,7 +81,11 @@ fn incremental_vs_full_root(inputs: &[&str], modified: &str) {
let modified_root = loader.root().unwrap();
// Update the intermediate roots table so that we can run the incremental verification
tx.write_storage_trie_updates(core::iter::once((&hashed_address, &trie_updates))).unwrap();
tx.write_storage_trie_updates_sorted(core::iter::once((
&hashed_address,
&trie_updates.into_sorted(),
)))
.unwrap();
// 3. Calculate the incremental root
let mut storage_changes = PrefixSetMut::default();
@@ -620,7 +624,7 @@ fn account_trie_around_extension_node_with_dbtrie() {
let (got, updates) = StateRoot::from_tx(tx.tx_ref()).root_with_updates().unwrap();
assert_eq!(expected, got);
tx.write_trie_updates(&updates).unwrap();
tx.write_trie_updates(updates).unwrap();
// read the account updates from the db
let mut accounts_trie = tx.tx_ref().cursor_read::<tables::AccountsTrie>().unwrap();
@@ -667,7 +671,7 @@ proptest! {
state.iter().map(|(&key, &balance)| (key, (Account { balance, ..Default::default() }, std::iter::empty())))
);
assert_eq!(expected_root, state_root);
tx.write_trie_updates(&trie_updates).unwrap();
tx.write_trie_updates(trie_updates).unwrap();
}
}
}

View File

@@ -33,7 +33,7 @@ pub fn calculate_state_root(c: &mut Criterion) {
provider_rw.write_hashed_state(&db_state.into_sorted()).unwrap();
let (_, updates) =
StateRoot::from_tx(provider_rw.tx_ref()).root_with_updates().unwrap();
provider_rw.write_trie_updates(&updates).unwrap();
provider_rw.write_trie_updates(updates).unwrap();
provider_rw.commit().unwrap();
}

View File

@@ -4767,9 +4767,12 @@ mod tests {
state.keys().copied().collect::<Vec<_>>(),
);
// Extract account nodes before moving hash_builder_updates
let hash_builder_account_nodes = hash_builder_updates.account_nodes.clone();
// Write trie updates to the database
let provider_rw = provider_factory.provider_rw().unwrap();
provider_rw.write_trie_updates(&hash_builder_updates).unwrap();
provider_rw.write_trie_updates(hash_builder_updates).unwrap();
provider_rw.commit().unwrap();
// Assert that the sparse trie root matches the hash builder root
@@ -4777,7 +4780,7 @@ mod tests {
// Assert that the sparse trie updates match the hash builder updates
pretty_assertions::assert_eq!(
BTreeMap::from_iter(sparse_updates.updated_nodes),
BTreeMap::from_iter(hash_builder_updates.account_nodes)
BTreeMap::from_iter(hash_builder_account_nodes)
);
// Assert that the sparse trie nodes match the hash builder proof nodes
assert_eq_parallel_sparse_trie_proof_nodes(
@@ -4812,9 +4815,12 @@ mod tests {
state.keys().copied().collect::<Vec<_>>(),
);
// Extract account nodes before moving hash_builder_updates
let hash_builder_account_nodes = hash_builder_updates.account_nodes.clone();
// Write trie updates to the database
let provider_rw = provider_factory.provider_rw().unwrap();
provider_rw.write_trie_updates(&hash_builder_updates).unwrap();
provider_rw.write_trie_updates(hash_builder_updates).unwrap();
provider_rw.commit().unwrap();
// Assert that the sparse trie root matches the hash builder root
@@ -4822,7 +4828,7 @@ mod tests {
// Assert that the sparse trie updates match the hash builder updates
pretty_assertions::assert_eq!(
BTreeMap::from_iter(sparse_updates.updated_nodes),
BTreeMap::from_iter(hash_builder_updates.account_nodes)
BTreeMap::from_iter(hash_builder_account_nodes)
);
// Assert that the sparse trie nodes match the hash builder proof nodes
assert_eq_parallel_sparse_trie_proof_nodes(

View File

@@ -2974,9 +2974,12 @@ mod tests {
state.keys().copied().collect::<Vec<_>>(),
);
// Extract account nodes before moving hash_builder_updates
let hash_builder_account_nodes = hash_builder_updates.account_nodes.clone();
// Write trie updates to the database
let provider_rw = provider_factory.provider_rw().unwrap();
provider_rw.write_trie_updates(&hash_builder_updates).unwrap();
provider_rw.write_trie_updates(hash_builder_updates).unwrap();
provider_rw.commit().unwrap();
// Assert that the sparse trie root matches the hash builder root
@@ -2984,7 +2987,7 @@ mod tests {
// Assert that the sparse trie updates match the hash builder updates
pretty_assertions::assert_eq!(
BTreeMap::from_iter(sparse_updates.updated_nodes),
BTreeMap::from_iter(hash_builder_updates.account_nodes)
BTreeMap::from_iter(hash_builder_account_nodes)
);
// Assert that the sparse trie nodes match the hash builder proof nodes
assert_eq_sparse_trie_proof_nodes(&updated_sparse, hash_builder_proof_nodes);
@@ -3016,9 +3019,12 @@ mod tests {
state.keys().copied().collect::<Vec<_>>(),
);
// Extract account nodes before moving hash_builder_updates
let hash_builder_account_nodes = hash_builder_updates.account_nodes.clone();
// Write trie updates to the database
let provider_rw = provider_factory.provider_rw().unwrap();
provider_rw.write_trie_updates(&hash_builder_updates).unwrap();
provider_rw.write_trie_updates(hash_builder_updates).unwrap();
provider_rw.commit().unwrap();
// Assert that the sparse trie root matches the hash builder root
@@ -3026,7 +3032,7 @@ mod tests {
// Assert that the sparse trie updates match the hash builder updates
pretty_assertions::assert_eq!(
BTreeMap::from_iter(sparse_updates.updated_nodes),
BTreeMap::from_iter(hash_builder_updates.account_nodes)
BTreeMap::from_iter(hash_builder_account_nodes)
);
// Assert that the sparse trie nodes match the hash builder proof nodes
assert_eq_sparse_trie_proof_nodes(&updated_sparse, hash_builder_proof_nodes);