mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
perf: optimize sparse trie (#22418)
Co-authored-by: Alexey Shekhirin <github@shekhirin.com> Co-authored-by: Brian Picciano <me@mediocregopher.com>
This commit is contained in:
@@ -31,7 +31,7 @@ use reth_trie_sparse::{
|
||||
SparseTrie,
|
||||
};
|
||||
use revm_primitives::{hash_map::Entry, B256Map};
|
||||
use tracing::{debug, debug_span, error, instrument};
|
||||
use tracing::{debug, debug_span, error, instrument, trace_span};
|
||||
|
||||
/// Maximum number of pending/prewarm updates that we accumulate in memory before actually applying.
|
||||
const MAX_PENDING_UPDATES: usize = 100;
|
||||
@@ -489,50 +489,38 @@ where
|
||||
let storage_updates =
|
||||
if new { &mut self.new_storage_updates } else { &mut self.storage_updates };
|
||||
|
||||
// Process all storage updates in parallel, skipping tries with no pending updates.
|
||||
let span = tracing::Span::current();
|
||||
let storage_results = storage_updates
|
||||
.iter_mut()
|
||||
.filter(|(_, updates)| !updates.is_empty())
|
||||
.map(|(address, updates)| {
|
||||
let trie = self.trie.take_or_create_storage_trie(address);
|
||||
let fetched = self.fetched_storage_targets.remove(address).unwrap_or_default();
|
||||
// Process all storage updates, skipping tries with no pending updates.
|
||||
let span = debug_span!("process_storage_leaf_updates").entered();
|
||||
for (address, updates) in storage_updates {
|
||||
if updates.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let _enter = trace_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_trie_leaf_updates", a=%address).entered();
|
||||
|
||||
(address, updates, fetched, trie)
|
||||
})
|
||||
.par_bridge_buffered()
|
||||
.map(|(address, updates, mut fetched, mut trie)| {
|
||||
let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_trie_leaf_updates", a=%address).entered();
|
||||
let mut targets = Vec::new();
|
||||
let trie = self.trie.get_or_create_storage_trie_mut(*address);
|
||||
let fetched = self.fetched_storage_targets.entry(*address).or_default();
|
||||
let mut targets = Vec::new();
|
||||
|
||||
trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
if min_len < *entry.get() {
|
||||
entry.insert(min_len);
|
||||
targets.push(Target::new(path).with_min_len(min_len));
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
if min_len < *entry.get() {
|
||||
entry.insert(min_len);
|
||||
targets.push(Target::new(path).with_min_len(min_len));
|
||||
}
|
||||
})?;
|
||||
|
||||
SparseTrieResult::Ok((address, targets, fetched, trie))
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
drop(span);
|
||||
|
||||
for (address, targets, fetched, trie) in storage_results {
|
||||
self.fetched_storage_targets.insert(*address, fetched);
|
||||
self.trie.insert_storage_trie(*address, trie);
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(min_len);
|
||||
targets.push(Target::new(path).with_min_len(min_len));
|
||||
}
|
||||
})?;
|
||||
|
||||
if !targets.is_empty() {
|
||||
self.pending_targets.extend_storage_targets(address, targets);
|
||||
}
|
||||
}
|
||||
|
||||
drop(span);
|
||||
|
||||
// Process account trie updates and fill the account targets.
|
||||
self.process_account_leaf_updates(new)?;
|
||||
|
||||
|
||||
@@ -486,23 +486,14 @@ impl SparseTrie for ParallelSparseTrie {
|
||||
// `new_nodes` to keep track of any nodes that were created during the traversal.
|
||||
let mut new_nodes = Vec::new();
|
||||
let mut next = Some(Nibbles::default());
|
||||
// Track the original node that was modified (path, original_node) for rollback
|
||||
let mut modified_original: Option<(Nibbles, SparseNode)> = None;
|
||||
|
||||
// Traverse the upper subtrie to find the node to update or the subtrie to update.
|
||||
//
|
||||
// We stop when the next node to traverse would be in a lower subtrie, or if there are no
|
||||
// more nodes to traverse.
|
||||
while let Some(current) =
|
||||
next.filter(|next| SparseSubtrieType::path_len_is_upper(next.len()))
|
||||
next.as_mut().filter(|next| SparseSubtrieType::path_len_is_upper(next.len()))
|
||||
{
|
||||
// Save original node for potential rollback (only if not already saved)
|
||||
if modified_original.is_none() &&
|
||||
let Some(node) = self.upper_subtrie.nodes.get(¤t)
|
||||
{
|
||||
modified_original = Some((current, node.clone()));
|
||||
}
|
||||
|
||||
// Traverse the next node, keeping track of any changed nodes and the next step in the
|
||||
// trie. If traversal fails, clean up the value we inserted and propagate the error.
|
||||
let step_result = self.upper_subtrie.update_next_node(current, &full_path);
|
||||
@@ -513,11 +504,7 @@ impl SparseTrie for ParallelSparseTrie {
|
||||
}
|
||||
|
||||
match step_result? {
|
||||
LeafUpdateStep::Continue { next_node } => {
|
||||
next = Some(next_node);
|
||||
// Clear modified_original since we haven't actually modified anything yet
|
||||
modified_original = None;
|
||||
}
|
||||
LeafUpdateStep::Continue => {}
|
||||
LeafUpdateStep::Complete { inserted_nodes } => {
|
||||
new_nodes.extend(inserted_nodes);
|
||||
next = None;
|
||||
@@ -1096,27 +1083,48 @@ impl SparseTrie for ParallelSparseTrie {
|
||||
let mut curr_subtrie_is_upper = true;
|
||||
|
||||
loop {
|
||||
let curr_node = curr_subtrie.nodes.get(&curr_path).unwrap();
|
||||
|
||||
match Self::find_next_to_leaf(&curr_path, curr_node, full_path) {
|
||||
FindNextToLeafOutcome::NotFound => return Ok(LeafLookup::NonExistent),
|
||||
FindNextToLeafOutcome::BlindedNode { path, hash } => {
|
||||
return Err(LeafLookupError::BlindedNode { path, hash });
|
||||
match curr_subtrie.nodes.get(&curr_path).unwrap() {
|
||||
SparseNode::Empty => return Ok(LeafLookup::NonExistent),
|
||||
SparseNode::Leaf { key, .. } => {
|
||||
let mut found_full_path = curr_path;
|
||||
found_full_path.extend(key);
|
||||
assert!(&found_full_path != full_path, "target leaf {full_path:?} found, even though value wasn't in values hashmap");
|
||||
return Ok(LeafLookup::NonExistent)
|
||||
}
|
||||
FindNextToLeafOutcome::Found => {
|
||||
panic!("target leaf {full_path:?} found at path {curr_path:?}, even though value wasn't in values hashmap");
|
||||
}
|
||||
FindNextToLeafOutcome::ContinueFrom(next_path) => {
|
||||
curr_path = next_path;
|
||||
// If we were previously looking at the upper trie, and the new path is in the
|
||||
// lower trie, we need to pull out a ref to the lower trie.
|
||||
if curr_subtrie_is_upper &&
|
||||
let Some(lower_subtrie) = self.lower_subtrie_for_path(&curr_path)
|
||||
{
|
||||
curr_subtrie = lower_subtrie;
|
||||
curr_subtrie_is_upper = false;
|
||||
SparseNode::Extension { key, .. } => {
|
||||
if full_path.len() == curr_path.len() {
|
||||
return Ok(LeafLookup::NonExistent)
|
||||
}
|
||||
curr_path.extend(key);
|
||||
if !full_path.starts_with(&curr_path) {
|
||||
return Ok(LeafLookup::NonExistent)
|
||||
}
|
||||
}
|
||||
SparseNode::Branch { state_mask, blinded_mask, blinded_hashes, .. } => {
|
||||
if full_path.len() == curr_path.len() {
|
||||
return Ok(LeafLookup::NonExistent)
|
||||
}
|
||||
let nibble = full_path.get_unchecked(curr_path.len());
|
||||
if !state_mask.is_bit_set(nibble) {
|
||||
return Ok(LeafLookup::NonExistent)
|
||||
}
|
||||
curr_path.push_unchecked(nibble);
|
||||
if blinded_mask.is_bit_set(nibble) {
|
||||
return Err(LeafLookupError::BlindedNode {
|
||||
path: curr_path,
|
||||
hash: blinded_hashes[nibble as usize],
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we were previously looking at the upper trie, and the new path is in the
|
||||
// lower trie, we need to pull out a ref to the lower trie.
|
||||
if curr_subtrie_is_upper &&
|
||||
let Some(lower_subtrie) = self.lower_subtrie_for_path(&curr_path)
|
||||
{
|
||||
curr_subtrie = lower_subtrie;
|
||||
curr_subtrie_is_upper = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2492,38 +2500,10 @@ impl SparseSubtrie {
|
||||
// Here we are starting at the root of the subtrie, and traversing from there.
|
||||
let mut current = Some(self.path);
|
||||
|
||||
// Track inserted nodes and modified original for rollback on error
|
||||
let mut inserted_nodes: Vec<Nibbles> = Vec::new();
|
||||
let mut modified_original: Option<(Nibbles, SparseNode)> = None;
|
||||
|
||||
while let Some(current_path) = current {
|
||||
// Save original node for potential rollback (only if not already saved)
|
||||
if modified_original.is_none() &&
|
||||
let Some(node) = self.nodes.get(¤t_path)
|
||||
{
|
||||
modified_original = Some((current_path, node.clone()));
|
||||
}
|
||||
|
||||
let step_result = self.update_next_node(current_path, &full_path);
|
||||
|
||||
if let Err(err) = step_result {
|
||||
self.rollback_leaf_insert(&full_path, &inserted_nodes, modified_original.take());
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
match step_result? {
|
||||
LeafUpdateStep::Continue { next_node } => {
|
||||
current = Some(next_node);
|
||||
// Clear modified_original since we haven't actually modified anything yet
|
||||
modified_original = None;
|
||||
}
|
||||
LeafUpdateStep::Complete { inserted_nodes: new_inserted } => {
|
||||
inserted_nodes.extend(new_inserted);
|
||||
current = None;
|
||||
}
|
||||
LeafUpdateStep::NodeNotFound => {
|
||||
current = None;
|
||||
}
|
||||
while let Some(current_path) = current.as_mut() {
|
||||
match self.update_next_node(current_path, &full_path)? {
|
||||
LeafUpdateStep::Continue => {}
|
||||
LeafUpdateStep::NodeNotFound | LeafUpdateStep::Complete { .. } => break,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2533,30 +2513,6 @@ impl SparseSubtrie {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Rollback structural changes made during a failed leaf insert.
|
||||
///
|
||||
/// This removes any nodes that were inserted and restores the original node
|
||||
/// that was modified, ensuring atomicity of `update_leaf`.
|
||||
fn rollback_leaf_insert(
|
||||
&mut self,
|
||||
full_path: &Nibbles,
|
||||
inserted_nodes: &[Nibbles],
|
||||
modified_original: Option<(Nibbles, SparseNode)>,
|
||||
) {
|
||||
// Remove any values that may have been inserted
|
||||
self.inner.values.remove(full_path);
|
||||
|
||||
// Remove all inserted nodes
|
||||
for node_path in inserted_nodes {
|
||||
self.nodes.remove(node_path);
|
||||
}
|
||||
|
||||
// Restore the original node that was modified
|
||||
if let Some((path, original_node)) = modified_original {
|
||||
self.nodes.insert(path, original_node);
|
||||
}
|
||||
}
|
||||
|
||||
/// Processes the current node, returning what to do next in the leaf update process.
|
||||
///
|
||||
/// This will add or update any nodes in the trie as necessary.
|
||||
@@ -2565,13 +2521,13 @@ impl SparseSubtrie {
|
||||
/// the paths of nodes that were inserted during this step.
|
||||
fn update_next_node(
|
||||
&mut self,
|
||||
mut current: Nibbles,
|
||||
current: &mut Nibbles,
|
||||
path: &Nibbles,
|
||||
) -> SparseTrieResult<LeafUpdateStep> {
|
||||
debug_assert!(path.starts_with(&self.path));
|
||||
debug_assert!(current.starts_with(&self.path));
|
||||
debug_assert!(path.starts_with(¤t));
|
||||
let Some(node) = self.nodes.get_mut(¤t) else {
|
||||
debug_assert!(path.starts_with(current));
|
||||
let Some(node) = self.nodes.get_mut(current) else {
|
||||
return Ok(LeafUpdateStep::NodeNotFound);
|
||||
};
|
||||
|
||||
@@ -2581,16 +2537,13 @@ impl SparseSubtrie {
|
||||
// the subtrie.
|
||||
let path = path.slice(self.path.len()..);
|
||||
*node = SparseNode::new_leaf(path);
|
||||
Ok(LeafUpdateStep::complete_with_insertions(vec![current]))
|
||||
Ok(LeafUpdateStep::complete_with_insertions(vec![*current]))
|
||||
}
|
||||
SparseNode::Leaf { key: current_key, .. } => {
|
||||
current.extend(current_key);
|
||||
|
||||
// this leaf is being updated
|
||||
debug_assert!(
|
||||
¤t != path,
|
||||
"we already checked leaf presence in the beginning"
|
||||
);
|
||||
debug_assert!(current != path, "we already checked leaf presence in the beginning");
|
||||
|
||||
// find the common prefix
|
||||
let common = current.common_prefix_length(path);
|
||||
@@ -2625,7 +2578,7 @@ impl SparseSubtrie {
|
||||
SparseNode::Extension { key, .. } => {
|
||||
current.extend(key);
|
||||
|
||||
if !path.starts_with(¤t) {
|
||||
if !path.starts_with(current) {
|
||||
// find the common prefix
|
||||
let common = current.common_prefix_length(path);
|
||||
*key = current.slice(current.len() - key.len()..common);
|
||||
@@ -2659,28 +2612,29 @@ impl SparseSubtrie {
|
||||
return Ok(LeafUpdateStep::complete_with_insertions(inserted_nodes))
|
||||
}
|
||||
|
||||
Ok(LeafUpdateStep::continue_with(current))
|
||||
Ok(LeafUpdateStep::Continue)
|
||||
}
|
||||
SparseNode::Branch { state_mask, blinded_mask, blinded_hashes, .. } => {
|
||||
let nibble = path.get_unchecked(current.len());
|
||||
current.push_unchecked(nibble);
|
||||
|
||||
if !state_mask.is_bit_set(nibble) {
|
||||
state_mask.set_bit(nibble);
|
||||
let new_leaf = SparseNode::new_leaf(path.slice(current.len()..));
|
||||
self.nodes.insert(current, new_leaf);
|
||||
return Ok(LeafUpdateStep::complete_with_insertions(vec![current]))
|
||||
self.nodes.insert(*current, new_leaf);
|
||||
return Ok(LeafUpdateStep::complete_with_insertions(vec![*current]))
|
||||
}
|
||||
|
||||
if blinded_mask.is_bit_set(nibble) {
|
||||
return Err(SparseTrieErrorKind::BlindedNode {
|
||||
path: current,
|
||||
path: *current,
|
||||
hash: blinded_hashes[nibble as usize],
|
||||
}
|
||||
.into());
|
||||
}
|
||||
|
||||
// If the nibble is set, we can continue traversing the branch.
|
||||
Ok(LeafUpdateStep::continue_with(current))
|
||||
Ok(LeafUpdateStep::Continue)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3381,10 +3335,7 @@ impl SparseSubtrieInner {
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Default)]
|
||||
pub enum LeafUpdateStep {
|
||||
/// Continue traversing to the next node
|
||||
Continue {
|
||||
/// The next node path to process
|
||||
next_node: Nibbles,
|
||||
},
|
||||
Continue,
|
||||
/// Update is complete with nodes inserted
|
||||
Complete {
|
||||
/// The node paths that were inserted during this step
|
||||
@@ -3396,11 +3347,6 @@ pub enum LeafUpdateStep {
|
||||
}
|
||||
|
||||
impl LeafUpdateStep {
|
||||
/// Creates a step to continue with the next node
|
||||
pub const fn continue_with(next_node: Nibbles) -> Self {
|
||||
Self::Continue { next_node }
|
||||
}
|
||||
|
||||
/// Creates a step indicating completion with inserted nodes
|
||||
pub const fn complete_with_insertions(inserted_nodes: Vec<Nibbles>) -> Self {
|
||||
Self::Complete { inserted_nodes }
|
||||
|
||||
Reference in New Issue
Block a user