perf(trie): ParallelSparseTrie::update_subtrie_hashes boilerplate (#16948)

This commit is contained in:
Alexey Shekhirin
2025-06-19 20:29:06 +01:00
committed by GitHub
parent f59a82e4c6
commit 9231652c6c
4 changed files with 251 additions and 71 deletions

1
Cargo.lock generated
View File

@@ -10586,6 +10586,7 @@ dependencies = [
"reth-primitives-traits",
"reth-trie-common",
"reth-trie-sparse",
"smallvec",
"tracing",
]

View File

@@ -23,6 +23,9 @@ alloy-trie.workspace = true
alloy-primitives.workspace = true
alloy-rlp.workspace = true
# misc
smallvec.workspace = true
[dev-dependencies]
# reth
reth-primitives-traits.workspace = true

View File

@@ -1,3 +1,5 @@
use std::sync::mpsc;
use alloy_primitives::{
map::{Entry, HashMap},
B256,
@@ -7,9 +9,13 @@ use alloy_trie::TrieMask;
use reth_execution_errors::{SparseTrieErrorKind, SparseTrieResult};
use reth_trie_common::{
prefix_set::{PrefixSet, PrefixSetMut},
Nibbles, TrieNode, CHILD_INDEX_RANGE,
Nibbles, RlpNode, TrieNode, CHILD_INDEX_RANGE,
};
use reth_trie_sparse::{blinded::BlindedProvider, SparseNode, SparseTrieUpdates, TrieMasks};
use reth_trie_sparse::{
blinded::BlindedProvider, RlpNodePathStackItem, RlpNodeStackItem, SparseNode,
SparseTrieUpdates, TrieMasks,
};
use smallvec::SmallVec;
use tracing::trace;
/// A revealed sparse trie with subtries that can be updated in parallel.
@@ -22,9 +28,12 @@ use tracing::trace;
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct ParallelSparseTrie {
/// This contains the trie nodes for the upper part of the trie.
upper_subtrie: SparseSubtrie,
upper_subtrie: Box<SparseSubtrie>,
/// An array containing the subtries at the second level of the trie.
lower_subtries: Box<[Option<SparseSubtrie>; 256]>,
lower_subtries: [Option<Box<SparseSubtrie>>; 256],
/// Set of prefixes (key paths) that have been marked as updated.
/// This is used to track which parts of the trie need to be recalculated.
prefix_set: PrefixSetMut,
/// Optional tracking of trie updates for later use.
updates: Option<SparseTrieUpdates>,
}
@@ -32,8 +41,9 @@ pub struct ParallelSparseTrie {
impl Default for ParallelSparseTrie {
fn default() -> Self {
Self {
upper_subtrie: SparseSubtrie::default(),
lower_subtries: Box::new([const { None }; 256]),
upper_subtrie: Box::default(),
lower_subtries: [const { None }; 256],
prefix_set: PrefixSetMut::default(),
updates: None,
}
}
@@ -42,13 +52,13 @@ impl Default for ParallelSparseTrie {
impl ParallelSparseTrie {
/// Returns mutable ref to the lower `SparseSubtrie` for the given path, or None if the path
/// belongs to the upper trie.
fn lower_subtrie_for_path(&mut self, path: &Nibbles) -> Option<&mut SparseSubtrie> {
fn lower_subtrie_for_path(&mut self, path: &Nibbles) -> Option<&mut Box<SparseSubtrie>> {
match SparseSubtrieType::from_path(path) {
SparseSubtrieType::Upper => None,
SparseSubtrieType::Lower(idx) => {
if self.lower_subtries[idx].is_none() {
let upper_path = path.slice(..2);
self.lower_subtries[idx] = Some(SparseSubtrie::new(upper_path));
self.lower_subtries[idx] = Some(Box::new(SparseSubtrie::new(upper_path)));
}
self.lower_subtries[idx].as_mut()
@@ -185,9 +195,28 @@ impl ParallelSparseTrie {
///
/// This function first identifies all nodes that have changed (based on the prefix set) below
/// level 2 of the trie, then recalculates their RLP representation.
pub fn update_subtrie_hashes(&mut self) -> SparseTrieResult<()> {
pub fn update_subtrie_hashes(&mut self) {
trace!(target: "trie::parallel_sparse", "Updating subtrie hashes");
todo!()
// Take changed subtries according to the prefix set
let mut prefix_set = core::mem::take(&mut self.prefix_set).freeze();
let (subtries, unchanged_prefix_set) = self.take_changed_lower_subtries(&mut prefix_set);
// Update the prefix set with the keys that didn't have matching subtries
self.prefix_set = unchanged_prefix_set;
// Update subtrie hashes in parallel
// TODO: call `update_hashes` on each subtrie in parallel
let (tx, rx) = mpsc::channel();
for subtrie in subtries {
tx.send((subtrie.index, subtrie.subtrie)).unwrap();
}
drop(tx);
// Return updated subtries back to the trie
for (index, subtrie) in rx {
self.lower_subtries[index] = Some(subtrie);
}
}
/// Calculates and returns the root hash of the trie.
@@ -206,53 +235,82 @@ impl ParallelSparseTrie {
/// If `retain_updates` is true, the trie will record branch node updates and deletions.
/// This information can then be used to efficiently update an external database.
pub fn with_updates(mut self, retain_updates: bool) -> Self {
if retain_updates {
self.updates = Some(SparseTrieUpdates::default());
}
self.updates = retain_updates.then_some(SparseTrieUpdates::default());
self
}
/// Returns a list of [subtries](SparseSubtrie) identifying the subtries that have changed
/// according to the provided [prefix set](PrefixSet).
/// Consumes and returns the currently accumulated trie updates.
///
/// Along with the subtries, prefix sets are returned. Each prefix set contains the keys from
/// the original prefix set that belong to the subtrie.
/// This is useful when you want to apply the updates to an external database,
/// and then start tracking a new set of updates.
pub fn take_updates(&mut self) -> SparseTrieUpdates {
core::iter::once(&mut self.upper_subtrie)
.chain(self.lower_subtries.iter_mut().flatten())
.fold(SparseTrieUpdates::default(), |mut acc, subtrie| {
acc.extend(subtrie.take_updates());
acc
})
}
/// Returns:
/// 1. List of lower [subtries](SparseSubtrie) that have changed according to the provided
/// [prefix set](PrefixSet). See documentation of [`ChangedSubtrie`] for more details.
/// 2. Prefix set of keys that do not belong to any lower subtrie.
///
/// This method helps optimize hash recalculations by identifying which specific
/// subtries need to be updated. Each subtrie can then be updated in parallel.
#[allow(unused)]
fn get_changed_subtries(
/// lower subtries need to be updated. Each lower subtrie can then be updated in parallel.
///
/// IMPORTANT: The method removes the subtries from `lower_subtries`, and the caller is
/// responsible for returning them back into the array.
fn take_changed_lower_subtries(
&mut self,
prefix_set: &mut PrefixSet,
) -> Vec<(SparseSubtrie, PrefixSet)> {
) -> (Vec<ChangedSubtrie>, PrefixSetMut) {
// Clone the prefix set to iterate over its keys. Cloning is cheap, it's just an Arc.
let prefix_set_clone = prefix_set.clone();
let mut prefix_set_iter = prefix_set_clone.into_iter();
let mut prefix_set_iter = prefix_set_clone.into_iter().cloned().peekable();
let mut changed_subtries = Vec::new();
let mut unchanged_prefix_set = PrefixSetMut::default();
let mut subtries = Vec::new();
for subtrie in self.lower_subtries.iter_mut() {
for (index, subtrie) in self.lower_subtries.iter_mut().enumerate() {
if let Some(subtrie) = subtrie.take_if(|subtrie| prefix_set.contains(&subtrie.path)) {
let prefix_set = if prefix_set.all() {
unchanged_prefix_set = PrefixSetMut::all();
PrefixSetMut::all()
} else {
// Take those keys from the original prefix set that start with the subtrie path
//
// Subtries are stored in the order of their paths, so we can use the same
// prefix set iterator.
PrefixSetMut::from(
prefix_set_iter
.by_ref()
.skip_while(|key| key < &&subtrie.path)
.take_while(|key| key.has_prefix(&subtrie.path))
.cloned(),
)
let mut new_prefix_set = Vec::new();
while let Some(key) = prefix_set_iter.peek() {
if key.has_prefix(&subtrie.path) {
// If the key starts with the subtrie path, add it to the new prefix set
new_prefix_set.push(prefix_set_iter.next().unwrap());
} else if new_prefix_set.is_empty() && key < &subtrie.path {
// If we didn't yet have any keys that belong to this subtrie, and the
// current key is still less than the subtrie path, add it to the
// unchanged prefix set
unchanged_prefix_set.insert(prefix_set_iter.next().unwrap());
} else {
// If we're past the subtrie path, we're done with this subtrie. Do not
// advance the iterator, the next key will be processed either by the
// next subtrie or inserted into the unchanged prefix set.
break
}
}
PrefixSetMut::from(new_prefix_set)
}
.freeze();
subtries.push((subtrie, prefix_set));
changed_subtries.push(ChangedSubtrie { index, subtrie, prefix_set });
}
}
subtries
// Extend the unchanged prefix set with the remaining keys that are not part of any subtries
unchanged_prefix_set.extend_keys(prefix_set_iter);
(changed_subtries, unchanged_prefix_set)
}
}
@@ -274,6 +332,10 @@ pub struct SparseSubtrie {
/// Map from leaf key paths to their values.
/// All values are stored here instead of directly in leaf nodes.
values: HashMap<Nibbles, Vec<u8>>,
/// Optional tracking of trie updates for later use.
updates: Option<SparseTrieUpdates>,
/// Reusable buffers for [`SparseSubtrie::update_hashes`].
buffers: SparseSubtrieBuffers,
}
impl SparseSubtrie {
@@ -281,6 +343,15 @@ impl SparseSubtrie {
Self { path, ..Default::default() }
}
/// Configures the subtrie to retain information about updates.
///
/// If `retain_updates` is true, the trie will record branch node updates and deletions.
/// This information can then be used to efficiently update an external database.
pub fn with_updates(mut self, retain_updates: bool) -> Self {
self.updates = retain_updates.then_some(SparseTrieUpdates::default());
self
}
/// Returns true if the current path and its child are both found in the same level. This
/// function assumes that if `current_path` is in a lower level then `child_path` is too.
fn is_child_same_level(current_path: &Nibbles, child_path: &Nibbles) -> bool {
@@ -485,11 +556,19 @@ impl SparseSubtrie {
}
/// Recalculates and updates the RLP hashes for the changed nodes in this subtrie.
pub fn update_hashes(&mut self, prefix_set: &mut PrefixSet) -> SparseTrieResult<()> {
#[allow(unused)]
fn update_hashes(&self, _prefix_set: &mut PrefixSet) -> RlpNode {
trace!(target: "trie::parallel_sparse", path=?self.path, "Updating subtrie hashes");
let _prefix_set = prefix_set;
todo!()
}
/// Consumes and returns the currently accumulated trie updates.
///
/// This is useful when you want to apply the updates to an external database,
/// and then start tracking a new set of updates.
fn take_updates(&mut self) -> SparseTrieUpdates {
self.updates.take().unwrap_or_default()
}
}
/// Sparse Subtrie Type.
@@ -518,6 +597,43 @@ impl SparseSubtrieType {
Self::Lower(path_subtrie_index_unchecked(path))
}
}
/// Returns the index of the lower subtrie, if it exists.
pub const fn lower_index(&self) -> Option<usize> {
match self {
Self::Upper => None,
Self::Lower(index) => Some(*index),
}
}
}
/// Collection of reusable buffers for calculating subtrie hashes.
///
/// These buffers reduce allocations when computing RLP representations during trie updates.
#[derive(Clone, PartialEq, Eq, Debug, Default)]
pub struct SparseSubtrieBuffers {
/// Stack of RLP node paths
path_stack: Vec<RlpNodePathStackItem>,
/// Stack of RLP nodes
rlp_node_stack: Vec<RlpNodeStackItem>,
/// Reusable branch child path
branch_child_buf: SmallVec<[Nibbles; 16]>,
/// Reusable branch value stack
branch_value_stack_buf: SmallVec<[RlpNode; 16]>,
/// Reusable RLP buffer
rlp_buf: Vec<u8>,
}
/// Changed subtrie.
#[derive(Debug)]
struct ChangedSubtrie {
/// Lower subtrie index in the range [0, 255].
index: usize,
/// Changed subtrie
subtrie: Box<SparseSubtrie>,
/// Prefix set of keys that belong to the subtrie.
#[allow(unused)]
prefix_set: PrefixSet,
}
/// Convert first two nibbles of the path into a lower subtrie index in the range [0, 255].
@@ -534,14 +650,15 @@ mod tests {
use super::{
path_subtrie_index_unchecked, ParallelSparseTrie, SparseSubtrie, SparseSubtrieType,
};
use crate::trie::ChangedSubtrie;
use alloy_primitives::B256;
use alloy_rlp::Encodable;
use alloy_trie::Nibbles;
use assert_matches::assert_matches;
use reth_primitives_traits::Account;
use reth_trie_common::{
prefix_set::{PrefixSet, PrefixSetMut},
BranchNode, ExtensionNode, LeafNode, RlpNode, TrieMask, TrieNode, EMPTY_ROOT_HASH,
prefix_set::PrefixSetMut, BranchNode, ExtensionNode, LeafNode, RlpNode, TrieMask, TrieNode,
EMPTY_ROOT_HASH,
};
use reth_trie_sparse::{SparseNode, TrieMasks};
@@ -586,21 +703,22 @@ mod tests {
#[test]
fn test_get_changed_subtries_empty() {
let mut trie = ParallelSparseTrie::default();
let mut prefix_set = PrefixSet::default();
let mut prefix_set = PrefixSetMut::from([Nibbles::default()]).freeze();
let changed = trie.get_changed_subtries(&mut prefix_set);
assert!(changed.is_empty());
let (subtries, unchanged_prefix_set) = trie.take_changed_lower_subtries(&mut prefix_set);
assert!(subtries.is_empty());
assert_eq!(unchanged_prefix_set, PrefixSetMut::from(prefix_set.iter().cloned()));
}
#[test]
fn test_get_changed_subtries() {
// Create a trie with three subtries
let mut trie = ParallelSparseTrie::default();
let subtrie_1 = SparseSubtrie::new(Nibbles::from_nibbles([0x0, 0x0]));
let subtrie_1 = Box::new(SparseSubtrie::new(Nibbles::from_nibbles([0x0, 0x0])));
let subtrie_1_index = path_subtrie_index_unchecked(&subtrie_1.path);
let subtrie_2 = SparseSubtrie::new(Nibbles::from_nibbles([0x1, 0x0]));
let subtrie_2 = Box::new(SparseSubtrie::new(Nibbles::from_nibbles([0x1, 0x0])));
let subtrie_2_index = path_subtrie_index_unchecked(&subtrie_2.path);
let subtrie_3 = SparseSubtrie::new(Nibbles::from_nibbles([0x3, 0x0]));
let subtrie_3 = Box::new(SparseSubtrie::new(Nibbles::from_nibbles([0x3, 0x0])));
let subtrie_3_index = path_subtrie_index_unchecked(&subtrie_3.path);
// Add subtries at specific positions
@@ -608,28 +726,30 @@ mod tests {
trie.lower_subtries[subtrie_2_index] = Some(subtrie_2.clone());
trie.lower_subtries[subtrie_3_index] = Some(subtrie_3);
let unchanged_prefix_set = PrefixSetMut::from([
Nibbles::from_nibbles_unchecked([0x0]),
Nibbles::from_nibbles_unchecked([0x2, 0x0, 0x0]),
]);
// Create a prefix set with the keys that match only the second subtrie
let mut prefix_set = PrefixSetMut::from([
// Doesn't match any subtries
Nibbles::from_nibbles_unchecked([0x0]),
// Match second subtrie
Nibbles::from_nibbles_unchecked([0x1, 0x0, 0x0]),
Nibbles::from_nibbles_unchecked([0x1, 0x0, 0x1, 0x0]),
// Doesn't match any subtries
Nibbles::from_nibbles_unchecked([0x2, 0x0, 0x0]),
])
.freeze();
]);
prefix_set.extend(unchanged_prefix_set);
let mut prefix_set = prefix_set.freeze();
// Second subtrie should be removed and returned
let changed = trie.get_changed_subtries(&mut prefix_set);
let (subtries, unchanged_prefix_set) = trie.take_changed_lower_subtries(&mut prefix_set);
assert_eq!(
changed
subtries
.into_iter()
.map(|(subtrie, prefix_set)| {
(subtrie, prefix_set.iter().cloned().collect::<Vec<_>>())
.map(|ChangedSubtrie { index, subtrie, prefix_set }| {
(index, subtrie, prefix_set.iter().cloned().collect::<Vec<_>>())
})
.collect::<Vec<_>>(),
vec![(
subtrie_2_index,
subtrie_2,
vec![
Nibbles::from_nibbles_unchecked([0x1, 0x0, 0x0]),
@@ -637,6 +757,7 @@ mod tests {
]
)]
);
assert_eq!(unchanged_prefix_set, unchanged_prefix_set);
assert!(trie.lower_subtries[subtrie_2_index].is_none());
// First subtrie should remain unchanged
@@ -647,11 +768,11 @@ mod tests {
fn test_get_changed_subtries_all() {
// Create a trie with three subtries
let mut trie = ParallelSparseTrie::default();
let subtrie_1 = SparseSubtrie::new(Nibbles::from_nibbles([0x0, 0x0]));
let subtrie_1 = Box::new(SparseSubtrie::new(Nibbles::from_nibbles([0x0, 0x0])));
let subtrie_1_index = path_subtrie_index_unchecked(&subtrie_1.path);
let subtrie_2 = SparseSubtrie::new(Nibbles::from_nibbles([0x1, 0x0]));
let subtrie_2 = Box::new(SparseSubtrie::new(Nibbles::from_nibbles([0x1, 0x0])));
let subtrie_2_index = path_subtrie_index_unchecked(&subtrie_2.path);
let subtrie_3 = SparseSubtrie::new(Nibbles::from_nibbles([0x3, 0x0]));
let subtrie_3 = Box::new(SparseSubtrie::new(Nibbles::from_nibbles([0x3, 0x0])));
let subtrie_3_index = path_subtrie_index_unchecked(&subtrie_3.path);
// Add subtries at specific positions
@@ -663,14 +784,22 @@ mod tests {
let mut prefix_set = PrefixSetMut::all().freeze();
// All subtries should be removed and returned
let changed = trie.get_changed_subtries(&mut prefix_set);
let (subtries, unchanged_prefix_set) = trie.take_changed_lower_subtries(&mut prefix_set);
assert_eq!(
changed
subtries
.into_iter()
.map(|(subtrie, prefix_set)| { (subtrie, prefix_set.all()) })
.map(|ChangedSubtrie { index, subtrie, prefix_set }| {
(index, subtrie, prefix_set.all())
})
.collect::<Vec<_>>(),
vec![(subtrie_1, true), (subtrie_2, true), (subtrie_3, true)]
vec![
(subtrie_1_index, subtrie_1, true),
(subtrie_2_index, subtrie_2, true),
(subtrie_3_index, subtrie_3, true)
]
);
assert_eq!(unchanged_prefix_set, PrefixSetMut::all());
assert!(trie.lower_subtries.iter().all(Option::is_none));
}
@@ -864,4 +993,44 @@ mod tests {
);
}
}
#[test]
fn test_update_subtrie_hashes() {
// Create a trie with three subtries
let mut trie = ParallelSparseTrie::default();
let subtrie_1 = Box::new(SparseSubtrie::new(Nibbles::from_nibbles([0x0, 0x0])));
let subtrie_1_index = path_subtrie_index_unchecked(&subtrie_1.path);
let subtrie_2 = Box::new(SparseSubtrie::new(Nibbles::from_nibbles([0x1, 0x0])));
let subtrie_2_index = path_subtrie_index_unchecked(&subtrie_2.path);
let subtrie_3 = Box::new(SparseSubtrie::new(Nibbles::from_nibbles([0x3, 0x0])));
let subtrie_3_index = path_subtrie_index_unchecked(&subtrie_3.path);
// Add subtries at specific positions
trie.lower_subtries[subtrie_1_index] = Some(subtrie_1);
trie.lower_subtries[subtrie_2_index] = Some(subtrie_2);
trie.lower_subtries[subtrie_3_index] = Some(subtrie_3);
let unchanged_prefix_set = PrefixSetMut::from([
Nibbles::from_nibbles_unchecked([0x0]),
Nibbles::from_nibbles_unchecked([0x2, 0x0, 0x0]),
]);
// Create a prefix set with the keys that match only the second subtrie
let mut prefix_set = PrefixSetMut::from([
// Match second subtrie
Nibbles::from_nibbles_unchecked([0x1, 0x0, 0x0]),
Nibbles::from_nibbles_unchecked([0x1, 0x0, 0x1, 0x0]),
]);
prefix_set.extend(unchanged_prefix_set.clone());
trie.prefix_set = prefix_set;
// Update subtrie hashes
trie.update_subtrie_hashes();
// Check that the prefix set was updated
assert_eq!(trie.prefix_set, unchanged_prefix_set);
// Check that subtries were returned back to the array
assert!(trie.lower_subtries[subtrie_1_index].is_some());
assert!(trie.lower_subtries[subtrie_2_index].is_some());
assert!(trie.lower_subtries[subtrie_3_index].is_some());
}
}

View File

@@ -1924,7 +1924,7 @@ impl<P: BlindedProvider> RevealedSparseTrie<P> {
/// Enum representing sparse trie node type.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum SparseNodeType {
pub enum SparseNodeType {
/// Empty trie node.
Empty,
/// A placeholder that stores only the hash for a node that has not been fully revealed.
@@ -2101,25 +2101,25 @@ impl RlpNodeBuffers {
}
/// RLP node path stack item.
#[derive(Debug)]
struct RlpNodePathStackItem {
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct RlpNodePathStackItem {
/// Level at which the node is located. Higher numbers correspond to lower levels in the trie.
level: usize,
pub level: usize,
/// Path to the node.
path: Nibbles,
pub path: Nibbles,
/// Whether the path is in the prefix set. If [`None`], then unknown yet.
is_in_prefix_set: Option<bool>,
pub is_in_prefix_set: Option<bool>,
}
/// RLP node stack item.
#[derive(Debug)]
struct RlpNodeStackItem {
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct RlpNodeStackItem {
/// Path to the node.
path: Nibbles,
pub path: Nibbles,
/// RLP node.
rlp_node: RlpNode,
pub rlp_node: RlpNode,
/// Type of the node.
node_type: SparseNodeType,
pub node_type: SparseNodeType,
}
/// Tracks modifications to the sparse trie structure.
@@ -2147,6 +2147,13 @@ impl SparseTrieUpdates {
self.removed_nodes.clear();
self.wiped = false;
}
/// Extends the updates with another set of updates.
pub fn extend(&mut self, other: Self) {
self.updated_nodes.extend(other.updated_nodes);
self.removed_nodes.extend(other.removed_nodes);
self.wiped |= other.wiped;
}
}
#[cfg(test)]