perf(trie): add k-way merge batch optimization for merge_overlay_trie_input (#21080)

This commit is contained in:
YK
2026-01-15 23:22:15 +08:00
committed by GitHub
parent f012b3391e
commit 7d0e7e72de
4 changed files with 298 additions and 27 deletions

View File

@@ -1,5 +1,11 @@
//! Types and traits for validating blocks and payloads.
/// Threshold for switching from `extend_ref` loop to `merge_batch` in `merge_overlay_trie_input`.
///
/// Benchmarked crossover: `extend_ref` wins up to ~64 blocks, `merge_batch` wins beyond.
/// Using 64 as threshold since they're roughly equal there.
const MERGE_BATCH_THRESHOLD: usize = 64;
use crate::tree::{
cached_state::CachedStateProvider,
error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError},
@@ -40,7 +46,10 @@ use reth_provider::{
StateProvider, StateProviderFactory, StateReader, TrieReader,
};
use reth_revm::db::State;
use reth_trie::{updates::TrieUpdates, HashedPostState, StateRoot, TrieInputSorted};
use reth_trie::{
updates::{TrieUpdates, TrieUpdatesSorted},
HashedPostState, HashedPostStateSorted, StateRoot, TrieInputSorted,
};
use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
use revm_primitives::Address;
use std::{
@@ -1012,34 +1021,63 @@ where
Ok((input, block_hash))
}
/// Aggregates multiple in-memory blocks into a single [`TrieInputSorted`] by combining their
/// Aggregates in-memory blocks into a single [`TrieInputSorted`] by combining their
/// state changes.
///
/// The input `blocks` vector is ordered newest -> oldest (see `TreeState::blocks_by_hash`).
/// We iterate it in reverse so we start with the oldest block's trie data and extend forward
/// toward the newest, ensuring newer state takes precedence.
///
/// Uses `extend_ref` loop for small k, k-way `merge_batch` for large k.
/// See [`MERGE_BATCH_THRESHOLD`] for crossover point.
fn merge_overlay_trie_input(blocks: &[ExecutedBlock<N>]) -> TrieInputSorted {
let mut input = TrieInputSorted::default();
let mut blocks_iter = blocks.iter().rev().peekable();
if let Some(first) = blocks_iter.next() {
let data = first.trie_data();
input.state = data.hashed_state;
input.nodes = data.trie_updates;
// Only clone and mutate if there are more in-memory blocks.
if blocks_iter.peek().is_some() {
let state_mut = Arc::make_mut(&mut input.state);
let nodes_mut = Arc::make_mut(&mut input.nodes);
for block in blocks_iter {
let data = block.trie_data();
state_mut.extend_ref(data.hashed_state.as_ref());
nodes_mut.extend_ref(data.trie_updates.as_ref());
}
}
if blocks.is_empty() {
return TrieInputSorted::default();
}
input
// Single block: return Arc directly without cloning
if blocks.len() == 1 {
let data = blocks[0].trie_data();
return TrieInputSorted {
state: Arc::clone(&data.hashed_state),
nodes: Arc::clone(&data.trie_updates),
prefix_sets: Default::default(),
};
}
if blocks.len() < MERGE_BATCH_THRESHOLD {
// Small k: extend_ref loop is faster
// Iterate oldest->newest so newer values override older ones
let mut blocks_iter = blocks.iter().rev();
let first = blocks_iter.next().expect("blocks is non-empty");
let data = first.trie_data();
let mut state = Arc::clone(&data.hashed_state);
let mut nodes = Arc::clone(&data.trie_updates);
let state_mut = Arc::make_mut(&mut state);
let nodes_mut = Arc::make_mut(&mut nodes);
for block in blocks_iter {
let data = block.trie_data();
state_mut.extend_ref(data.hashed_state.as_ref());
nodes_mut.extend_ref(data.trie_updates.as_ref());
}
TrieInputSorted { state, nodes, prefix_sets: Default::default() }
} else {
// Large k: merge_batch is faster (O(n log k) via k-way merge)
let trie_data: Vec<_> = blocks.iter().map(|b| b.trie_data()).collect();
let merged_state = HashedPostStateSorted::merge_batch(
trie_data.iter().map(|d| d.hashed_state.as_ref()),
);
let merged_nodes =
TrieUpdatesSorted::merge_batch(trie_data.iter().map(|d| d.trie_updates.as_ref()));
TrieInputSorted {
state: Arc::new(merged_state),
nodes: Arc::new(merged_nodes),
prefix_sets: Default::default(),
}
}
}
/// Spawns a background task to compute and sort trie data for the executed block.

View File

@@ -3,7 +3,7 @@ use core::ops::Not;
use crate::{
added_removed_keys::MultiAddedRemovedKeys,
prefix_set::{PrefixSetMut, TriePrefixSetsMut},
utils::extend_sorted_vec,
utils::{extend_sorted_vec, kway_merge_sorted},
KeyHasher, MultiProofTargets, Nibbles,
};
use alloc::{borrow::Cow, vec::Vec};
@@ -634,6 +634,62 @@ impl HashedPostStateSorted {
}
}
/// Batch-merge sorted hashed post states. Iterator yields **newest to oldest**.
///
/// Uses k-way merge for O(n log k) complexity and one-pass accumulation for storages.
pub fn merge_batch<'a>(states: impl IntoIterator<Item = &'a Self>) -> Self {
let states: Vec<_> = states.into_iter().collect();
if states.is_empty() {
return Self::default();
}
let accounts = kway_merge_sorted(states.iter().map(|s| s.accounts.as_slice()));
struct StorageAcc<'a> {
/// Account storage was cleared (e.g., SELFDESTRUCT).
wiped: bool,
/// Stop collecting older slices after seeing a wipe.
sealed: bool,
/// Storage slot slices to merge, ordered newest to oldest.
slices: Vec<&'a [(B256, U256)]>,
}
let mut acc: B256Map<StorageAcc<'_>> = B256Map::default();
// Accumulate storage slices per address from newest to oldest state.
// Once we see a `wiped` flag, the account was cleared at that point,
// so older storage slots are irrelevant - we "seal" and stop collecting.
for state in &states {
for (addr, storage) in &state.storages {
let entry = acc.entry(*addr).or_insert_with(|| StorageAcc {
wiped: false,
sealed: false,
slices: Vec::new(),
});
if entry.sealed {
continue;
}
entry.slices.push(storage.storage_slots.as_slice());
if storage.wiped {
entry.wiped = true;
entry.sealed = true;
}
}
}
let storages = acc
.into_iter()
.map(|(addr, entry)| {
let storage_slots = kway_merge_sorted(entry.slices);
(addr, HashedStorageSorted { wiped: entry.wiped, storage_slots })
})
.collect();
Self { accounts, storages }
}
/// Clears all accounts and storage data.
pub fn clear(&mut self) {
self.accounts.clear();
@@ -648,7 +704,7 @@ impl AsRef<Self> for HashedPostStateSorted {
}
/// Sorted hashed storage optimized for iterating during state trie calculation.
#[derive(Clone, Eq, PartialEq, Debug)]
#[derive(Clone, Eq, PartialEq, Debug, Default)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct HashedStorageSorted {
/// Sorted collection of updated storage slots. [`U256::ZERO`] indicates a deleted value.
@@ -694,6 +750,21 @@ impl HashedStorageSorted {
// Extend the sorted non-zero valued slots
extend_sorted_vec(&mut self.storage_slots, &other.storage_slots);
}
/// Batch-merge sorted hashed storage. Iterator yields **newest to oldest**.
/// If any update is wiped, prior data is discarded.
pub fn merge_batch<'a>(updates: impl IntoIterator<Item = &'a Self>) -> Self {
let updates: Vec<_> = updates.into_iter().collect();
if updates.is_empty() {
return Self::default();
}
let wipe_idx = updates.iter().position(|u| u.wiped);
let relevant = wipe_idx.map_or(&updates[..], |idx| &updates[..=idx]);
let storage_slots = kway_merge_sorted(relevant.iter().map(|u| u.storage_slots.as_slice()));
Self { wiped: wipe_idx.is_some(), storage_slots }
}
}
impl From<HashedStorageSorted> for HashedStorage {

View File

@@ -1,4 +1,7 @@
use crate::{utils::extend_sorted_vec, BranchNodeCompact, HashBuilder, Nibbles};
use crate::{
utils::{extend_sorted_vec, kway_merge_sorted},
BranchNodeCompact, HashBuilder, Nibbles,
};
use alloc::{
collections::{btree_map::BTreeMap, btree_set::BTreeSet},
vec::Vec,
@@ -23,6 +26,15 @@ pub struct TrieUpdates {
}
impl TrieUpdates {
/// Creates a new `TrieUpdates` with pre-allocated capacity.
pub fn with_capacity(account_nodes: usize, storage_tries: usize) -> Self {
Self {
account_nodes: HashMap::with_capacity_and_hasher(account_nodes, Default::default()),
removed_nodes: HashSet::with_capacity_and_hasher(account_nodes / 4, Default::default()),
storage_tries: B256Map::with_capacity_and_hasher(storage_tries, Default::default()),
}
}
/// Returns `true` if the updates are empty.
pub fn is_empty(&self) -> bool {
self.account_nodes.is_empty() &&
@@ -611,6 +623,69 @@ impl TrieUpdatesSorted {
self.account_nodes.clear();
self.storage_tries.clear();
}
/// Batch-merge sorted trie updates. Iterator yields **newest to oldest**.
///
/// This is more efficient than repeated `extend_ref` calls for large batches,
/// using k-way merge for O(n log k) complexity instead of O(n * k).
pub fn merge_batch<'a>(updates: impl IntoIterator<Item = &'a Self>) -> Self {
let updates: Vec<_> = updates.into_iter().collect();
if updates.is_empty() {
return Self::default();
}
// Merge account nodes using k-way merge. Newest (index 0) takes precedence.
let account_nodes = kway_merge_sorted(updates.iter().map(|u| u.account_nodes.as_slice()));
// Accumulator for collecting storage trie slices per address.
// We process updates newest-to-oldest and stop collecting for an address
// once we hit a "deleted" storage (sealed=true), since older data is irrelevant.
struct StorageAcc<'a> {
/// Storage trie was deleted (account removed or cleared).
is_deleted: bool,
/// Stop collecting older slices after seeing a deletion.
sealed: bool,
/// Storage trie node slices to merge, ordered newest to oldest.
slices: Vec<&'a [(Nibbles, Option<BranchNodeCompact>)]>,
}
let mut acc: B256Map<StorageAcc<'_>> = B256Map::default();
// Collect storage slices per address, respecting deletion boundaries
for update in &updates {
for (addr, storage) in &update.storage_tries {
let entry = acc.entry(*addr).or_insert_with(|| StorageAcc {
is_deleted: false,
sealed: false,
slices: Vec::new(),
});
// Skip if we already hit a deletion for this address (older data is irrelevant)
if entry.sealed {
continue;
}
entry.slices.push(storage.storage_nodes.as_slice());
// If this storage was deleted, mark as deleted and seal to ignore older updates
if storage.is_deleted {
entry.is_deleted = true;
entry.sealed = true;
}
}
}
// Merge each address's storage slices using k-way merge
let storage_tries = acc
.into_iter()
.map(|(addr, entry)| {
let storage_nodes = kway_merge_sorted(entry.slices);
(addr, StorageTrieUpdatesSorted { is_deleted: entry.is_deleted, storage_nodes })
})
.collect();
Self { account_nodes, storage_tries }
}
}
impl AsRef<Self> for TrieUpdatesSorted {
@@ -702,6 +777,22 @@ impl StorageTrieUpdatesSorted {
extend_sorted_vec(&mut self.storage_nodes, &other.storage_nodes);
self.is_deleted = self.is_deleted || other.is_deleted;
}
/// Batch-merge sorted storage trie updates. Iterator yields **newest to oldest**.
/// If any update is deleted, older data is discarded.
pub fn merge_batch<'a>(updates: impl IntoIterator<Item = &'a Self>) -> Self {
let updates: Vec<_> = updates.into_iter().collect();
if updates.is_empty() {
return Self::default();
}
// Discard updates older than the first deletion since the trie was wiped at that point.
let del_idx = updates.iter().position(|u| u.is_deleted);
let relevant = del_idx.map_or(&updates[..], |idx| &updates[..=idx]);
let storage_nodes = kway_merge_sorted(relevant.iter().map(|u| u.storage_nodes.as_slice()));
Self { is_deleted: del_idx.is_some(), storage_nodes }
}
}
/// Excludes empty nibbles from the given iterator.

View File

@@ -1,7 +1,33 @@
use alloc::vec::Vec;
use core::cmp::Ordering;
use itertools::Itertools;
/// Helper function to extend a sorted vector with another sorted vector.
/// Merge sorted slices into a sorted `Vec`. First occurrence wins for duplicate keys.
///
/// Callers pass slices in priority order (index 0 = highest priority), so the first
/// slice's value for a key takes precedence over later slices.
pub(crate) fn kway_merge_sorted<'a, K, V>(
slices: impl IntoIterator<Item = &'a [(K, V)]>,
) -> Vec<(K, V)>
where
K: Ord + Clone + 'a,
V: Clone + 'a,
{
slices
.into_iter()
.filter(|s| !s.is_empty())
.enumerate()
// Merge by reference: (priority, &K, &V) - avoids cloning all elements upfront
.map(|(i, s)| s.iter().map(move |(k, v)| (i, k, v)))
.kmerge_by(|(i1, k1, _), (i2, k2, _)| (k1, i1) < (k2, i2))
.dedup_by(|(_, k1, _), (_, k2, _)| *k1 == *k2)
// Clone only surviving elements after dedup
.map(|(_, k, v)| (k.clone(), v.clone()))
.collect()
}
/// Extend a sorted vector with another sorted vector.
/// Values from `other` take precedence for duplicate keys.
///
/// Values from `other` take precedence for duplicate keys.
pub(crate) fn extend_sorted_vec<K, V>(target: &mut Vec<(K, V)>, other: &[(K, V)])
@@ -52,4 +78,49 @@ mod tests {
extend_sorted_vec(&mut target, &other);
assert_eq!(target, vec![(1, "a"), (2, "b"), (3, "c_new")]);
}
#[test]
fn test_kway_merge_sorted_basic() {
let slice1 = vec![(1, "a1"), (3, "c1")];
let slice2 = vec![(2, "b2"), (3, "c2")];
let slice3 = vec![(1, "a3"), (4, "d3")];
let result = kway_merge_sorted([slice1.as_slice(), slice2.as_slice(), slice3.as_slice()]);
// First occurrence wins: key 1 -> a1 (slice1), key 3 -> c1 (slice1)
assert_eq!(result, vec![(1, "a1"), (2, "b2"), (3, "c1"), (4, "d3")]);
}
#[test]
fn test_kway_merge_sorted_empty_slices() {
let slice1: Vec<(i32, &str)> = vec![];
let slice2 = vec![(1, "a")];
let slice3: Vec<(i32, &str)> = vec![];
let result = kway_merge_sorted([slice1.as_slice(), slice2.as_slice(), slice3.as_slice()]);
assert_eq!(result, vec![(1, "a")]);
}
#[test]
fn test_kway_merge_sorted_all_same_key() {
let slice1 = vec![(5, "first")];
let slice2 = vec![(5, "middle")];
let slice3 = vec![(5, "last")];
let result = kway_merge_sorted([slice1.as_slice(), slice2.as_slice(), slice3.as_slice()]);
// First occurrence wins (slice1 has highest priority)
assert_eq!(result, vec![(5, "first")]);
}
#[test]
fn test_kway_merge_sorted_single_slice() {
let slice = vec![(1, "a"), (2, "b"), (3, "c")];
let result = kway_merge_sorted([slice.as_slice()]);
assert_eq!(result, vec![(1, "a"), (2, "b"), (3, "c")]);
}
#[test]
fn test_kway_merge_sorted_no_slices() {
let result: Vec<(i32, &str)> = kway_merge_sorted(Vec::<&[(i32, &str)]>::new());
assert!(result.is_empty());
}
}