mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
chore(trie): Move hybrid check for trie input merges into common code (#21198)
This commit is contained in:
@@ -123,60 +123,18 @@ impl LazyOverlay {
|
||||
|
||||
/// Merge all blocks' trie data into a single [`TrieInputSorted`].
|
||||
///
|
||||
/// Blocks are ordered newest to oldest. Uses hybrid merge algorithm that
|
||||
/// switches between `extend_ref` (small batches) and k-way merge (large batches).
|
||||
/// Blocks are ordered newest to oldest.
|
||||
fn merge_blocks(blocks: &[DeferredTrieData]) -> TrieInputSorted {
|
||||
const MERGE_BATCH_THRESHOLD: usize = 64;
|
||||
|
||||
if blocks.is_empty() {
|
||||
return TrieInputSorted::default();
|
||||
}
|
||||
|
||||
// Single block: use its data directly (no allocation)
|
||||
if blocks.len() == 1 {
|
||||
let data = blocks[0].wait_cloned();
|
||||
return TrieInputSorted {
|
||||
state: data.hashed_state,
|
||||
nodes: data.trie_updates,
|
||||
prefix_sets: Default::default(),
|
||||
};
|
||||
}
|
||||
let state =
|
||||
HashedPostStateSorted::merge_batch(blocks.iter().map(|b| b.wait_cloned().hashed_state));
|
||||
let nodes =
|
||||
TrieUpdatesSorted::merge_batch(blocks.iter().map(|b| b.wait_cloned().trie_updates));
|
||||
|
||||
if blocks.len() < MERGE_BATCH_THRESHOLD {
|
||||
// Small k: extend_ref loop with Arc::make_mut is faster.
|
||||
// Uses copy-on-write - only clones inner data if Arc has multiple refs.
|
||||
// Iterate oldest->newest so newer values override older ones.
|
||||
let mut blocks_iter = blocks.iter().rev();
|
||||
let first = blocks_iter.next().expect("blocks is non-empty");
|
||||
let data = first.wait_cloned();
|
||||
|
||||
let mut state = data.hashed_state;
|
||||
let mut nodes = data.trie_updates;
|
||||
|
||||
for block in blocks_iter {
|
||||
let block_data = block.wait_cloned();
|
||||
Arc::make_mut(&mut state).extend_ref_and_sort(block_data.hashed_state.as_ref());
|
||||
Arc::make_mut(&mut nodes).extend_ref_and_sort(block_data.trie_updates.as_ref());
|
||||
}
|
||||
|
||||
TrieInputSorted { state, nodes, prefix_sets: Default::default() }
|
||||
} else {
|
||||
// Large k: k-way merge is faster (O(n log k)).
|
||||
// Collect is unavoidable here - we need all data materialized for k-way merge.
|
||||
let trie_data: Vec<_> = blocks.iter().map(|b| b.wait_cloned()).collect();
|
||||
|
||||
let merged_state = HashedPostStateSorted::merge_batch(
|
||||
trie_data.iter().map(|d| d.hashed_state.as_ref()),
|
||||
);
|
||||
let merged_nodes =
|
||||
TrieUpdatesSorted::merge_batch(trie_data.iter().map(|d| d.trie_updates.as_ref()));
|
||||
|
||||
TrieInputSorted {
|
||||
state: Arc::new(merged_state),
|
||||
nodes: Arc::new(merged_nodes),
|
||||
prefix_sets: Default::default(),
|
||||
}
|
||||
}
|
||||
TrieInputSorted { state, nodes, prefix_sets: Default::default() }
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -560,43 +560,12 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
|
||||
// Write all trie updates in a single batch.
|
||||
// This reduces cursor open/close overhead from N calls to 1.
|
||||
// Uses hybrid algorithm: extend_ref for small batches, k-way merge for large.
|
||||
if save_mode.with_state() {
|
||||
const MERGE_BATCH_THRESHOLD: usize = 30;
|
||||
|
||||
let start = Instant::now();
|
||||
let num_blocks = blocks.len();
|
||||
|
||||
let merged = if num_blocks == 0 {
|
||||
TrieUpdatesSorted::default()
|
||||
} else if num_blocks == 1 {
|
||||
// Single block: use directly (Arc::try_unwrap avoids clone if refcount is 1)
|
||||
match Arc::try_unwrap(blocks[0].trie_updates()) {
|
||||
Ok(owned) => owned,
|
||||
Err(arc) => (*arc).clone(),
|
||||
}
|
||||
} else if num_blocks < MERGE_BATCH_THRESHOLD {
|
||||
// Small k: extend_ref with Arc::make_mut (copy-on-write).
|
||||
// Blocks are oldest-to-newest, iterate forward so newest overrides.
|
||||
let mut blocks_iter = blocks.iter();
|
||||
let mut result = blocks_iter.next().expect("non-empty").trie_updates();
|
||||
|
||||
for block in blocks_iter {
|
||||
Arc::make_mut(&mut result)
|
||||
.extend_ref_and_sort(block.trie_updates().as_ref());
|
||||
}
|
||||
|
||||
match Arc::try_unwrap(result) {
|
||||
Ok(owned) => owned,
|
||||
Err(arc) => (*arc).clone(),
|
||||
}
|
||||
} else {
|
||||
// Large k: k-way merge is faster (O(n log k)).
|
||||
// Collect Arcs first to extend lifetime, then pass refs.
|
||||
// Blocks are oldest-to-newest, merge_batch expects newest-to-oldest.
|
||||
let arcs: Vec<_> = blocks.iter().rev().map(|b| b.trie_updates()).collect();
|
||||
TrieUpdatesSorted::merge_batch(arcs.iter().map(|arc| arc.as_ref()))
|
||||
};
|
||||
// Blocks are oldest-to-newest, merge_batch expects newest-to-oldest.
|
||||
let merged =
|
||||
TrieUpdatesSorted::merge_batch(blocks.iter().rev().map(|b| b.trie_updates()));
|
||||
|
||||
if !merged.is_empty() {
|
||||
self.write_trie_updates_sorted(&merged)?;
|
||||
|
||||
@@ -638,31 +638,44 @@ impl HashedPostStateSorted {
|
||||
|
||||
/// Batch-merge sorted hashed post states. Iterator yields **newest to oldest**.
|
||||
///
|
||||
/// Uses k-way merge for O(n log k) complexity and one-pass accumulation for storages.
|
||||
pub fn merge_batch<'a>(states: impl IntoIterator<Item = &'a Self>) -> Self {
|
||||
let states: Vec<_> = states.into_iter().collect();
|
||||
if states.is_empty() {
|
||||
return Self::default();
|
||||
/// For small batches, uses `extend_ref_and_sort` loop.
|
||||
/// For large batches, uses k-way merge for O(n log k) complexity.
|
||||
pub fn merge_batch<T: AsRef<Self> + From<Self>>(iter: impl IntoIterator<Item = T>) -> T {
|
||||
const THRESHOLD: usize = 30;
|
||||
|
||||
let items: alloc::vec::Vec<_> = iter.into_iter().collect();
|
||||
let k = items.len();
|
||||
|
||||
if k == 0 {
|
||||
return Self::default().into();
|
||||
}
|
||||
if k == 1 {
|
||||
return items.into_iter().next().expect("k == 1");
|
||||
}
|
||||
|
||||
let accounts = kway_merge_sorted(states.iter().map(|s| s.accounts.as_slice()));
|
||||
if k < THRESHOLD {
|
||||
// Small k: extend loop, oldest-to-newest so newer overrides older.
|
||||
let mut iter = items.iter().rev();
|
||||
let mut acc = iter.next().expect("k > 0").as_ref().clone();
|
||||
for next in iter {
|
||||
acc.extend_ref_and_sort(next.as_ref());
|
||||
}
|
||||
return acc.into();
|
||||
}
|
||||
|
||||
// Large k: k-way merge.
|
||||
let accounts = kway_merge_sorted(items.iter().map(|i| i.as_ref().accounts.as_slice()));
|
||||
|
||||
struct StorageAcc<'a> {
|
||||
/// Account storage was cleared (e.g., SELFDESTRUCT).
|
||||
wiped: bool,
|
||||
/// Stop collecting older slices after seeing a wipe.
|
||||
sealed: bool,
|
||||
/// Storage slot slices to merge, ordered newest to oldest.
|
||||
slices: Vec<&'a [(B256, U256)]>,
|
||||
}
|
||||
|
||||
let mut acc: B256Map<StorageAcc<'_>> = B256Map::default();
|
||||
|
||||
// Accumulate storage slices per address from newest to oldest state.
|
||||
// Once we see a `wiped` flag, the account was cleared at that point,
|
||||
// so older storage slots are irrelevant - we "seal" and stop collecting.
|
||||
for state in &states {
|
||||
for (addr, storage) in &state.storages {
|
||||
for item in &items {
|
||||
for (addr, storage) in &item.as_ref().storages {
|
||||
let entry = acc.entry(*addr).or_insert_with(|| StorageAcc {
|
||||
wiped: false,
|
||||
sealed: false,
|
||||
@@ -689,7 +702,7 @@ impl HashedPostStateSorted {
|
||||
})
|
||||
.collect();
|
||||
|
||||
Self { accounts, storages }
|
||||
Self { accounts, storages }.into()
|
||||
}
|
||||
|
||||
/// Clears all accounts and storage data.
|
||||
|
||||
@@ -629,48 +629,57 @@ impl TrieUpdatesSorted {
|
||||
|
||||
/// Batch-merge sorted trie updates. Iterator yields **newest to oldest**.
|
||||
///
|
||||
/// This is more efficient than repeated `extend_ref` calls for large batches,
|
||||
/// using k-way merge for O(n log k) complexity instead of O(n * k).
|
||||
pub fn merge_batch<'a>(updates: impl IntoIterator<Item = &'a Self>) -> Self {
|
||||
let updates: Vec<_> = updates.into_iter().collect();
|
||||
if updates.is_empty() {
|
||||
return Self::default();
|
||||
/// For small batches, uses `extend_ref_and_sort` loop.
|
||||
/// For large batches, uses k-way merge for O(n log k) complexity.
|
||||
pub fn merge_batch<T: AsRef<Self> + From<Self>>(iter: impl IntoIterator<Item = T>) -> T {
|
||||
const THRESHOLD: usize = 30;
|
||||
|
||||
let items: alloc::vec::Vec<_> = iter.into_iter().collect();
|
||||
let k = items.len();
|
||||
|
||||
if k == 0 {
|
||||
return Self::default().into();
|
||||
}
|
||||
if k == 1 {
|
||||
return items.into_iter().next().expect("k == 1");
|
||||
}
|
||||
|
||||
// Merge account nodes using k-way merge. Newest (index 0) takes precedence.
|
||||
let account_nodes = kway_merge_sorted(updates.iter().map(|u| u.account_nodes.as_slice()));
|
||||
if k < THRESHOLD {
|
||||
// Small k: extend loop, oldest-to-newest so newer overrides older.
|
||||
let mut iter = items.iter().rev();
|
||||
let mut acc = iter.next().expect("k > 0").as_ref().clone();
|
||||
for next in iter {
|
||||
acc.extend_ref_and_sort(next.as_ref());
|
||||
}
|
||||
return acc.into();
|
||||
}
|
||||
|
||||
// Large k: k-way merge.
|
||||
let account_nodes =
|
||||
kway_merge_sorted(items.iter().map(|i| i.as_ref().account_nodes.as_slice()));
|
||||
|
||||
// Accumulator for collecting storage trie slices per address.
|
||||
// We process updates newest-to-oldest and stop collecting for an address
|
||||
// once we hit a "deleted" storage (sealed=true), since older data is irrelevant.
|
||||
struct StorageAcc<'a> {
|
||||
/// Storage trie was deleted (account removed or cleared).
|
||||
is_deleted: bool,
|
||||
/// Stop collecting older slices after seeing a deletion.
|
||||
sealed: bool,
|
||||
/// Storage trie node slices to merge, ordered newest to oldest.
|
||||
slices: Vec<&'a [(Nibbles, Option<BranchNodeCompact>)]>,
|
||||
}
|
||||
|
||||
let mut acc: B256Map<StorageAcc<'_>> = B256Map::default();
|
||||
|
||||
// Collect storage slices per address, respecting deletion boundaries
|
||||
for update in &updates {
|
||||
for (addr, storage) in &update.storage_tries {
|
||||
for item in &items {
|
||||
for (addr, storage) in &item.as_ref().storage_tries {
|
||||
let entry = acc.entry(*addr).or_insert_with(|| StorageAcc {
|
||||
is_deleted: false,
|
||||
sealed: false,
|
||||
slices: Vec::new(),
|
||||
});
|
||||
|
||||
// Skip if we already hit a deletion for this address (older data is irrelevant)
|
||||
if entry.sealed {
|
||||
continue;
|
||||
}
|
||||
|
||||
entry.slices.push(storage.storage_nodes.as_slice());
|
||||
|
||||
// If this storage was deleted, mark as deleted and seal to ignore older updates
|
||||
if storage.is_deleted {
|
||||
entry.is_deleted = true;
|
||||
entry.sealed = true;
|
||||
@@ -678,7 +687,6 @@ impl TrieUpdatesSorted {
|
||||
}
|
||||
}
|
||||
|
||||
// Merge each address's storage slices using k-way merge
|
||||
let storage_tries = acc
|
||||
.into_iter()
|
||||
.map(|(addr, entry)| {
|
||||
@@ -687,7 +695,7 @@ impl TrieUpdatesSorted {
|
||||
})
|
||||
.collect();
|
||||
|
||||
Self { account_nodes, storage_tries }
|
||||
Self { account_nodes, storage_tries }.into()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user