mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-08 03:01:12 -04:00
perf(trie): reuse update action buffers in parallel sparse trie processing (#17352)
This commit is contained in:
@@ -49,6 +49,9 @@ pub struct ParallelSparseTrie {
|
||||
branch_node_tree_masks: HashMap<Nibbles, TrieMask>,
|
||||
/// When a bit is set, the corresponding child is stored as a hash in the database.
|
||||
branch_node_hash_masks: HashMap<Nibbles, TrieMask>,
|
||||
/// Reusable buffer pool used for collecting [`SparseTrieUpdatesAction`]s during hash
|
||||
/// computations.
|
||||
update_actions_buffers: Vec<Vec<SparseTrieUpdatesAction>>,
|
||||
}
|
||||
|
||||
impl Default for ParallelSparseTrie {
|
||||
@@ -63,6 +66,7 @@ impl Default for ParallelSparseTrie {
|
||||
updates: None,
|
||||
branch_node_tree_masks: HashMap::default(),
|
||||
branch_node_hash_masks: HashMap::default(),
|
||||
update_actions_buffers: Vec::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -590,15 +594,16 @@ impl SparseTrieInterface for ParallelSparseTrie {
|
||||
|
||||
#[cfg(not(feature = "std"))]
|
||||
// Update subtrie hashes serially if nostd
|
||||
for ChangedSubtrie { index, mut subtrie, mut prefix_set } in subtries {
|
||||
let mut update_actions = self.updates_enabled().then(|| Vec::new());
|
||||
for ChangedSubtrie { index, mut subtrie, mut prefix_set, mut update_actions_buf } in
|
||||
subtries
|
||||
{
|
||||
subtrie.update_hashes(
|
||||
&mut prefix_set,
|
||||
&mut update_actions,
|
||||
&mut update_actions_buf,
|
||||
&self.branch_node_tree_masks,
|
||||
&self.branch_node_hash_masks,
|
||||
);
|
||||
tx.send((index, subtrie, update_actions)).unwrap();
|
||||
tx.send((index, subtrie, update_actions_buf)).unwrap();
|
||||
}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
@@ -609,16 +614,22 @@ impl SparseTrieInterface for ParallelSparseTrie {
|
||||
let branch_node_hash_masks = &self.branch_node_hash_masks;
|
||||
subtries
|
||||
.into_par_iter()
|
||||
.map(|ChangedSubtrie { index, mut subtrie, mut prefix_set }| {
|
||||
let mut update_actions = self.updates_enabled().then(Vec::new);
|
||||
subtrie.update_hashes(
|
||||
&mut prefix_set,
|
||||
&mut update_actions,
|
||||
branch_node_tree_masks,
|
||||
branch_node_hash_masks,
|
||||
);
|
||||
(index, subtrie, update_actions)
|
||||
})
|
||||
.map(
|
||||
|ChangedSubtrie {
|
||||
index,
|
||||
mut subtrie,
|
||||
mut prefix_set,
|
||||
mut update_actions_buf,
|
||||
}| {
|
||||
subtrie.update_hashes(
|
||||
&mut prefix_set,
|
||||
&mut update_actions_buf,
|
||||
branch_node_tree_masks,
|
||||
branch_node_hash_masks,
|
||||
);
|
||||
(index, subtrie, update_actions_buf)
|
||||
},
|
||||
)
|
||||
.for_each_init(|| tx.clone(), |tx, result| tx.send(result).unwrap());
|
||||
}
|
||||
|
||||
@@ -626,8 +637,15 @@ impl SparseTrieInterface for ParallelSparseTrie {
|
||||
|
||||
// Return updated subtries back to the trie after executing any actions required on the
|
||||
// top-level `SparseTrieUpdates`.
|
||||
for (index, subtrie, update_actions) in rx {
|
||||
self.apply_subtrie_update_actions(update_actions);
|
||||
for (index, subtrie, update_actions_buf) in rx {
|
||||
if let Some(mut update_actions_buf) = update_actions_buf {
|
||||
self.apply_subtrie_update_actions(
|
||||
#[allow(clippy::iter_with_drain)]
|
||||
update_actions_buf.drain(..),
|
||||
);
|
||||
self.update_actions_buffers.push(update_actions_buf);
|
||||
}
|
||||
|
||||
self.lower_subtries[index] = LowerSparseSubtrie::Revealed(subtrie);
|
||||
}
|
||||
}
|
||||
@@ -658,6 +676,8 @@ impl SparseTrieInterface for ParallelSparseTrie {
|
||||
}
|
||||
self.prefix_set.clear();
|
||||
self.updates = None;
|
||||
// `update_actions_buffers` doesn't need to be cleared; we want to reuse the Vecs it has
|
||||
// buffered, and all of those are already inherently cleared when they get used.
|
||||
}
|
||||
|
||||
fn find_leaf(
|
||||
@@ -1036,9 +1056,9 @@ impl ParallelSparseTrie {
|
||||
/// the given `updates` set. If the given set is None then this is a no-op.
|
||||
fn apply_subtrie_update_actions(
|
||||
&mut self,
|
||||
update_actions: Option<impl IntoIterator<Item = SparseTrieUpdatesAction>>,
|
||||
update_actions: impl Iterator<Item = SparseTrieUpdatesAction>,
|
||||
) {
|
||||
if let (Some(updates), Some(update_actions)) = (self.updates.as_mut(), update_actions) {
|
||||
if let Some(updates) = self.updates.as_mut() {
|
||||
for action in update_actions {
|
||||
match action {
|
||||
SparseTrieUpdatesAction::InsertRemoved(path) => {
|
||||
@@ -1067,7 +1087,9 @@ impl ParallelSparseTrie {
|
||||
is_in_prefix_set: None,
|
||||
});
|
||||
|
||||
let mut update_actions = self.updates_enabled().then(Vec::new);
|
||||
let mut update_actions_buf =
|
||||
self.updates_enabled().then(|| self.update_actions_buffers.pop().unwrap_or_default());
|
||||
|
||||
while let Some(stack_item) = self.upper_subtrie.inner.buffers.path_stack.pop() {
|
||||
let path = stack_item.path;
|
||||
let node = if path.len() < UPPER_TRIE_MAX_DEPTH {
|
||||
@@ -1092,7 +1114,7 @@ impl ParallelSparseTrie {
|
||||
// Calculate the RLP node for the current node using upper subtrie
|
||||
self.upper_subtrie.inner.rlp_node(
|
||||
prefix_set,
|
||||
&mut update_actions,
|
||||
&mut update_actions_buf,
|
||||
stack_item,
|
||||
node,
|
||||
&self.branch_node_tree_masks,
|
||||
@@ -1102,7 +1124,13 @@ impl ParallelSparseTrie {
|
||||
|
||||
// If there were any branch node updates as a result of calculating the RLP node for the
|
||||
// upper trie then apply them to the top-level set.
|
||||
self.apply_subtrie_update_actions(update_actions);
|
||||
if let Some(mut update_actions_buf) = update_actions_buf {
|
||||
self.apply_subtrie_update_actions(
|
||||
#[allow(clippy::iter_with_drain)]
|
||||
update_actions_buf.drain(..),
|
||||
);
|
||||
self.update_actions_buffers.push(update_actions_buf);
|
||||
}
|
||||
|
||||
debug_assert_eq!(self.upper_subtrie.inner.buffers.rlp_node_stack.len(), 1);
|
||||
self.upper_subtrie.inner.buffers.rlp_node_stack.pop().unwrap().rlp_node
|
||||
@@ -1127,6 +1155,7 @@ impl ParallelSparseTrie {
|
||||
let mut prefix_set_iter = prefix_set_clone.into_iter().copied().peekable();
|
||||
let mut changed_subtries = Vec::new();
|
||||
let mut unchanged_prefix_set = PrefixSetMut::default();
|
||||
let updates_enabled = self.updates_enabled();
|
||||
|
||||
for (index, subtrie) in self.lower_subtries.iter_mut().enumerate() {
|
||||
if let Some(subtrie) =
|
||||
@@ -1173,7 +1202,15 @@ impl ParallelSparseTrie {
|
||||
_ => {}
|
||||
}
|
||||
|
||||
changed_subtries.push(ChangedSubtrie { index, subtrie, prefix_set });
|
||||
let update_actions_buf =
|
||||
updates_enabled.then(|| self.update_actions_buffers.pop().unwrap_or_default());
|
||||
|
||||
changed_subtries.push(ChangedSubtrie {
|
||||
index,
|
||||
subtrie,
|
||||
prefix_set,
|
||||
update_actions_buf,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2168,8 +2205,10 @@ struct ChangedSubtrie {
|
||||
/// Changed subtrie
|
||||
subtrie: Box<SparseSubtrie>,
|
||||
/// Prefix set of keys that belong to the subtrie.
|
||||
#[allow(unused)]
|
||||
prefix_set: PrefixSet,
|
||||
/// Reusable buffer for collecting [`SparseTrieUpdatesAction`]s during computations. Will be
|
||||
/// None if update retention is disabled.
|
||||
update_actions_buf: Option<Vec<SparseTrieUpdatesAction>>,
|
||||
}
|
||||
|
||||
/// Convert first [`UPPER_TRIE_MAX_DEPTH`] nibbles of the path into a lower subtrie index in the
|
||||
@@ -2698,7 +2737,7 @@ mod tests {
|
||||
assert_eq!(
|
||||
subtries
|
||||
.into_iter()
|
||||
.map(|ChangedSubtrie { index, subtrie, prefix_set }| {
|
||||
.map(|ChangedSubtrie { index, subtrie, prefix_set, .. }| {
|
||||
(index, subtrie, prefix_set.iter().copied().collect::<Vec<_>>())
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
@@ -2742,7 +2781,7 @@ mod tests {
|
||||
assert_eq!(
|
||||
subtries
|
||||
.into_iter()
|
||||
.map(|ChangedSubtrie { index, subtrie, prefix_set }| {
|
||||
.map(|ChangedSubtrie { index, subtrie, prefix_set, .. }| {
|
||||
(index, subtrie, prefix_set.all())
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
|
||||
Reference in New Issue
Block a user