From 7d0e7e72de9a00d6cc415def287f2354e9714092 Mon Sep 17 00:00:00 2001 From: YK Date: Thu, 15 Jan 2026 23:22:15 +0800 Subject: [PATCH] perf(trie): add k-way merge batch optimization for merge_overlay_trie_input (#21080) --- .../engine/tree/src/tree/payload_validator.rs | 84 ++++++++++++----- crates/trie/common/src/hashed_state.rs | 75 ++++++++++++++- crates/trie/common/src/updates.rs | 93 ++++++++++++++++++- crates/trie/common/src/utils.rs | 73 ++++++++++++++- 4 files changed, 298 insertions(+), 27 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_validator.rs b/crates/engine/tree/src/tree/payload_validator.rs index fbc62d7d0e..746b9077f2 100644 --- a/crates/engine/tree/src/tree/payload_validator.rs +++ b/crates/engine/tree/src/tree/payload_validator.rs @@ -1,5 +1,11 @@ //! Types and traits for validating blocks and payloads. +/// Threshold for switching from `extend_ref` loop to `merge_batch` in `merge_overlay_trie_input`. +/// +/// Benchmarked crossover: `extend_ref` wins up to ~64 blocks, `merge_batch` wins beyond. +/// Using 64 as threshold since they're roughly equal there. +const MERGE_BATCH_THRESHOLD: usize = 64; + use crate::tree::{ cached_state::CachedStateProvider, error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError}, @@ -40,7 +46,10 @@ use reth_provider::{ StateProvider, StateProviderFactory, StateReader, TrieReader, }; use reth_revm::db::State; -use reth_trie::{updates::TrieUpdates, HashedPostState, StateRoot, TrieInputSorted}; +use reth_trie::{ + updates::{TrieUpdates, TrieUpdatesSorted}, + HashedPostState, HashedPostStateSorted, StateRoot, TrieInputSorted, +}; use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError}; use revm_primitives::Address; use std::{ @@ -1012,34 +1021,63 @@ where Ok((input, block_hash)) } - /// Aggregates multiple in-memory blocks into a single [`TrieInputSorted`] by combining their + /// Aggregates in-memory blocks into a single [`TrieInputSorted`] by combining their /// state changes. /// /// The input `blocks` vector is ordered newest -> oldest (see `TreeState::blocks_by_hash`). - /// We iterate it in reverse so we start with the oldest block's trie data and extend forward - /// toward the newest, ensuring newer state takes precedence. + /// + /// Uses `extend_ref` loop for small k, k-way `merge_batch` for large k. + /// See [`MERGE_BATCH_THRESHOLD`] for crossover point. fn merge_overlay_trie_input(blocks: &[ExecutedBlock]) -> TrieInputSorted { - let mut input = TrieInputSorted::default(); - let mut blocks_iter = blocks.iter().rev().peekable(); - - if let Some(first) = blocks_iter.next() { - let data = first.trie_data(); - input.state = data.hashed_state; - input.nodes = data.trie_updates; - - // Only clone and mutate if there are more in-memory blocks. - if blocks_iter.peek().is_some() { - let state_mut = Arc::make_mut(&mut input.state); - let nodes_mut = Arc::make_mut(&mut input.nodes); - for block in blocks_iter { - let data = block.trie_data(); - state_mut.extend_ref(data.hashed_state.as_ref()); - nodes_mut.extend_ref(data.trie_updates.as_ref()); - } - } + if blocks.is_empty() { + return TrieInputSorted::default(); } - input + // Single block: return Arc directly without cloning + if blocks.len() == 1 { + let data = blocks[0].trie_data(); + return TrieInputSorted { + state: Arc::clone(&data.hashed_state), + nodes: Arc::clone(&data.trie_updates), + prefix_sets: Default::default(), + }; + } + + if blocks.len() < MERGE_BATCH_THRESHOLD { + // Small k: extend_ref loop is faster + // Iterate oldest->newest so newer values override older ones + let mut blocks_iter = blocks.iter().rev(); + let first = blocks_iter.next().expect("blocks is non-empty"); + let data = first.trie_data(); + + let mut state = Arc::clone(&data.hashed_state); + let mut nodes = Arc::clone(&data.trie_updates); + let state_mut = Arc::make_mut(&mut state); + let nodes_mut = Arc::make_mut(&mut nodes); + + for block in blocks_iter { + let data = block.trie_data(); + state_mut.extend_ref(data.hashed_state.as_ref()); + nodes_mut.extend_ref(data.trie_updates.as_ref()); + } + + TrieInputSorted { state, nodes, prefix_sets: Default::default() } + } else { + // Large k: merge_batch is faster (O(n log k) via k-way merge) + let trie_data: Vec<_> = blocks.iter().map(|b| b.trie_data()).collect(); + + let merged_state = HashedPostStateSorted::merge_batch( + trie_data.iter().map(|d| d.hashed_state.as_ref()), + ); + let merged_nodes = + TrieUpdatesSorted::merge_batch(trie_data.iter().map(|d| d.trie_updates.as_ref())); + + TrieInputSorted { + state: Arc::new(merged_state), + nodes: Arc::new(merged_nodes), + prefix_sets: Default::default(), + } + } } /// Spawns a background task to compute and sort trie data for the executed block. diff --git a/crates/trie/common/src/hashed_state.rs b/crates/trie/common/src/hashed_state.rs index 5f771aae0e..283f1d3b69 100644 --- a/crates/trie/common/src/hashed_state.rs +++ b/crates/trie/common/src/hashed_state.rs @@ -3,7 +3,7 @@ use core::ops::Not; use crate::{ added_removed_keys::MultiAddedRemovedKeys, prefix_set::{PrefixSetMut, TriePrefixSetsMut}, - utils::extend_sorted_vec, + utils::{extend_sorted_vec, kway_merge_sorted}, KeyHasher, MultiProofTargets, Nibbles, }; use alloc::{borrow::Cow, vec::Vec}; @@ -634,6 +634,62 @@ impl HashedPostStateSorted { } } + /// Batch-merge sorted hashed post states. Iterator yields **newest to oldest**. + /// + /// Uses k-way merge for O(n log k) complexity and one-pass accumulation for storages. + pub fn merge_batch<'a>(states: impl IntoIterator) -> Self { + let states: Vec<_> = states.into_iter().collect(); + if states.is_empty() { + return Self::default(); + } + + let accounts = kway_merge_sorted(states.iter().map(|s| s.accounts.as_slice())); + + struct StorageAcc<'a> { + /// Account storage was cleared (e.g., SELFDESTRUCT). + wiped: bool, + /// Stop collecting older slices after seeing a wipe. + sealed: bool, + /// Storage slot slices to merge, ordered newest to oldest. + slices: Vec<&'a [(B256, U256)]>, + } + + let mut acc: B256Map> = B256Map::default(); + + // Accumulate storage slices per address from newest to oldest state. + // Once we see a `wiped` flag, the account was cleared at that point, + // so older storage slots are irrelevant - we "seal" and stop collecting. + for state in &states { + for (addr, storage) in &state.storages { + let entry = acc.entry(*addr).or_insert_with(|| StorageAcc { + wiped: false, + sealed: false, + slices: Vec::new(), + }); + + if entry.sealed { + continue; + } + + entry.slices.push(storage.storage_slots.as_slice()); + if storage.wiped { + entry.wiped = true; + entry.sealed = true; + } + } + } + + let storages = acc + .into_iter() + .map(|(addr, entry)| { + let storage_slots = kway_merge_sorted(entry.slices); + (addr, HashedStorageSorted { wiped: entry.wiped, storage_slots }) + }) + .collect(); + + Self { accounts, storages } + } + /// Clears all accounts and storage data. pub fn clear(&mut self) { self.accounts.clear(); @@ -648,7 +704,7 @@ impl AsRef for HashedPostStateSorted { } /// Sorted hashed storage optimized for iterating during state trie calculation. -#[derive(Clone, Eq, PartialEq, Debug)] +#[derive(Clone, Eq, PartialEq, Debug, Default)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct HashedStorageSorted { /// Sorted collection of updated storage slots. [`U256::ZERO`] indicates a deleted value. @@ -694,6 +750,21 @@ impl HashedStorageSorted { // Extend the sorted non-zero valued slots extend_sorted_vec(&mut self.storage_slots, &other.storage_slots); } + + /// Batch-merge sorted hashed storage. Iterator yields **newest to oldest**. + /// If any update is wiped, prior data is discarded. + pub fn merge_batch<'a>(updates: impl IntoIterator) -> Self { + let updates: Vec<_> = updates.into_iter().collect(); + if updates.is_empty() { + return Self::default(); + } + + let wipe_idx = updates.iter().position(|u| u.wiped); + let relevant = wipe_idx.map_or(&updates[..], |idx| &updates[..=idx]); + let storage_slots = kway_merge_sorted(relevant.iter().map(|u| u.storage_slots.as_slice())); + + Self { wiped: wipe_idx.is_some(), storage_slots } + } } impl From for HashedStorage { diff --git a/crates/trie/common/src/updates.rs b/crates/trie/common/src/updates.rs index f515fc20f6..6214d5ec08 100644 --- a/crates/trie/common/src/updates.rs +++ b/crates/trie/common/src/updates.rs @@ -1,4 +1,7 @@ -use crate::{utils::extend_sorted_vec, BranchNodeCompact, HashBuilder, Nibbles}; +use crate::{ + utils::{extend_sorted_vec, kway_merge_sorted}, + BranchNodeCompact, HashBuilder, Nibbles, +}; use alloc::{ collections::{btree_map::BTreeMap, btree_set::BTreeSet}, vec::Vec, @@ -23,6 +26,15 @@ pub struct TrieUpdates { } impl TrieUpdates { + /// Creates a new `TrieUpdates` with pre-allocated capacity. + pub fn with_capacity(account_nodes: usize, storage_tries: usize) -> Self { + Self { + account_nodes: HashMap::with_capacity_and_hasher(account_nodes, Default::default()), + removed_nodes: HashSet::with_capacity_and_hasher(account_nodes / 4, Default::default()), + storage_tries: B256Map::with_capacity_and_hasher(storage_tries, Default::default()), + } + } + /// Returns `true` if the updates are empty. pub fn is_empty(&self) -> bool { self.account_nodes.is_empty() && @@ -611,6 +623,69 @@ impl TrieUpdatesSorted { self.account_nodes.clear(); self.storage_tries.clear(); } + + /// Batch-merge sorted trie updates. Iterator yields **newest to oldest**. + /// + /// This is more efficient than repeated `extend_ref` calls for large batches, + /// using k-way merge for O(n log k) complexity instead of O(n * k). + pub fn merge_batch<'a>(updates: impl IntoIterator) -> Self { + let updates: Vec<_> = updates.into_iter().collect(); + if updates.is_empty() { + return Self::default(); + } + + // Merge account nodes using k-way merge. Newest (index 0) takes precedence. + let account_nodes = kway_merge_sorted(updates.iter().map(|u| u.account_nodes.as_slice())); + + // Accumulator for collecting storage trie slices per address. + // We process updates newest-to-oldest and stop collecting for an address + // once we hit a "deleted" storage (sealed=true), since older data is irrelevant. + struct StorageAcc<'a> { + /// Storage trie was deleted (account removed or cleared). + is_deleted: bool, + /// Stop collecting older slices after seeing a deletion. + sealed: bool, + /// Storage trie node slices to merge, ordered newest to oldest. + slices: Vec<&'a [(Nibbles, Option)]>, + } + + let mut acc: B256Map> = B256Map::default(); + + // Collect storage slices per address, respecting deletion boundaries + for update in &updates { + for (addr, storage) in &update.storage_tries { + let entry = acc.entry(*addr).or_insert_with(|| StorageAcc { + is_deleted: false, + sealed: false, + slices: Vec::new(), + }); + + // Skip if we already hit a deletion for this address (older data is irrelevant) + if entry.sealed { + continue; + } + + entry.slices.push(storage.storage_nodes.as_slice()); + + // If this storage was deleted, mark as deleted and seal to ignore older updates + if storage.is_deleted { + entry.is_deleted = true; + entry.sealed = true; + } + } + } + + // Merge each address's storage slices using k-way merge + let storage_tries = acc + .into_iter() + .map(|(addr, entry)| { + let storage_nodes = kway_merge_sorted(entry.slices); + (addr, StorageTrieUpdatesSorted { is_deleted: entry.is_deleted, storage_nodes }) + }) + .collect(); + + Self { account_nodes, storage_tries } + } } impl AsRef for TrieUpdatesSorted { @@ -702,6 +777,22 @@ impl StorageTrieUpdatesSorted { extend_sorted_vec(&mut self.storage_nodes, &other.storage_nodes); self.is_deleted = self.is_deleted || other.is_deleted; } + + /// Batch-merge sorted storage trie updates. Iterator yields **newest to oldest**. + /// If any update is deleted, older data is discarded. + pub fn merge_batch<'a>(updates: impl IntoIterator) -> Self { + let updates: Vec<_> = updates.into_iter().collect(); + if updates.is_empty() { + return Self::default(); + } + + // Discard updates older than the first deletion since the trie was wiped at that point. + let del_idx = updates.iter().position(|u| u.is_deleted); + let relevant = del_idx.map_or(&updates[..], |idx| &updates[..=idx]); + let storage_nodes = kway_merge_sorted(relevant.iter().map(|u| u.storage_nodes.as_slice())); + + Self { is_deleted: del_idx.is_some(), storage_nodes } + } } /// Excludes empty nibbles from the given iterator. diff --git a/crates/trie/common/src/utils.rs b/crates/trie/common/src/utils.rs index a70608ea60..7c1d454a6f 100644 --- a/crates/trie/common/src/utils.rs +++ b/crates/trie/common/src/utils.rs @@ -1,7 +1,33 @@ use alloc::vec::Vec; use core::cmp::Ordering; +use itertools::Itertools; -/// Helper function to extend a sorted vector with another sorted vector. +/// Merge sorted slices into a sorted `Vec`. First occurrence wins for duplicate keys. +/// +/// Callers pass slices in priority order (index 0 = highest priority), so the first +/// slice's value for a key takes precedence over later slices. +pub(crate) fn kway_merge_sorted<'a, K, V>( + slices: impl IntoIterator, +) -> Vec<(K, V)> +where + K: Ord + Clone + 'a, + V: Clone + 'a, +{ + slices + .into_iter() + .filter(|s| !s.is_empty()) + .enumerate() + // Merge by reference: (priority, &K, &V) - avoids cloning all elements upfront + .map(|(i, s)| s.iter().map(move |(k, v)| (i, k, v))) + .kmerge_by(|(i1, k1, _), (i2, k2, _)| (k1, i1) < (k2, i2)) + .dedup_by(|(_, k1, _), (_, k2, _)| *k1 == *k2) + // Clone only surviving elements after dedup + .map(|(_, k, v)| (k.clone(), v.clone())) + .collect() +} + +/// Extend a sorted vector with another sorted vector. +/// Values from `other` take precedence for duplicate keys. /// /// Values from `other` take precedence for duplicate keys. pub(crate) fn extend_sorted_vec(target: &mut Vec<(K, V)>, other: &[(K, V)]) @@ -52,4 +78,49 @@ mod tests { extend_sorted_vec(&mut target, &other); assert_eq!(target, vec![(1, "a"), (2, "b"), (3, "c_new")]); } + + #[test] + fn test_kway_merge_sorted_basic() { + let slice1 = vec![(1, "a1"), (3, "c1")]; + let slice2 = vec![(2, "b2"), (3, "c2")]; + let slice3 = vec![(1, "a3"), (4, "d3")]; + + let result = kway_merge_sorted([slice1.as_slice(), slice2.as_slice(), slice3.as_slice()]); + // First occurrence wins: key 1 -> a1 (slice1), key 3 -> c1 (slice1) + assert_eq!(result, vec![(1, "a1"), (2, "b2"), (3, "c1"), (4, "d3")]); + } + + #[test] + fn test_kway_merge_sorted_empty_slices() { + let slice1: Vec<(i32, &str)> = vec![]; + let slice2 = vec![(1, "a")]; + let slice3: Vec<(i32, &str)> = vec![]; + + let result = kway_merge_sorted([slice1.as_slice(), slice2.as_slice(), slice3.as_slice()]); + assert_eq!(result, vec![(1, "a")]); + } + + #[test] + fn test_kway_merge_sorted_all_same_key() { + let slice1 = vec![(5, "first")]; + let slice2 = vec![(5, "middle")]; + let slice3 = vec![(5, "last")]; + + let result = kway_merge_sorted([slice1.as_slice(), slice2.as_slice(), slice3.as_slice()]); + // First occurrence wins (slice1 has highest priority) + assert_eq!(result, vec![(5, "first")]); + } + + #[test] + fn test_kway_merge_sorted_single_slice() { + let slice = vec![(1, "a"), (2, "b"), (3, "c")]; + let result = kway_merge_sorted([slice.as_slice()]); + assert_eq!(result, vec![(1, "a"), (2, "b"), (3, "c")]); + } + + #[test] + fn test_kway_merge_sorted_no_slices() { + let result: Vec<(i32, &str)> = kway_merge_sorted(Vec::<&[(i32, &str)]>::new()); + assert!(result.is_empty()); + } }