diff --git a/crates/chain-state/src/lazy_overlay.rs b/crates/chain-state/src/lazy_overlay.rs index b611f29241..58ccee5f90 100644 --- a/crates/chain-state/src/lazy_overlay.rs +++ b/crates/chain-state/src/lazy_overlay.rs @@ -123,60 +123,18 @@ impl LazyOverlay { /// Merge all blocks' trie data into a single [`TrieInputSorted`]. /// - /// Blocks are ordered newest to oldest. Uses hybrid merge algorithm that - /// switches between `extend_ref` (small batches) and k-way merge (large batches). + /// Blocks are ordered newest to oldest. fn merge_blocks(blocks: &[DeferredTrieData]) -> TrieInputSorted { - const MERGE_BATCH_THRESHOLD: usize = 64; - if blocks.is_empty() { return TrieInputSorted::default(); } - // Single block: use its data directly (no allocation) - if blocks.len() == 1 { - let data = blocks[0].wait_cloned(); - return TrieInputSorted { - state: data.hashed_state, - nodes: data.trie_updates, - prefix_sets: Default::default(), - }; - } + let state = + HashedPostStateSorted::merge_batch(blocks.iter().map(|b| b.wait_cloned().hashed_state)); + let nodes = + TrieUpdatesSorted::merge_batch(blocks.iter().map(|b| b.wait_cloned().trie_updates)); - if blocks.len() < MERGE_BATCH_THRESHOLD { - // Small k: extend_ref loop with Arc::make_mut is faster. - // Uses copy-on-write - only clones inner data if Arc has multiple refs. - // Iterate oldest->newest so newer values override older ones. - let mut blocks_iter = blocks.iter().rev(); - let first = blocks_iter.next().expect("blocks is non-empty"); - let data = first.wait_cloned(); - - let mut state = data.hashed_state; - let mut nodes = data.trie_updates; - - for block in blocks_iter { - let block_data = block.wait_cloned(); - Arc::make_mut(&mut state).extend_ref_and_sort(block_data.hashed_state.as_ref()); - Arc::make_mut(&mut nodes).extend_ref_and_sort(block_data.trie_updates.as_ref()); - } - - TrieInputSorted { state, nodes, prefix_sets: Default::default() } - } else { - // Large k: k-way merge is faster (O(n log k)). - // Collect is unavoidable here - we need all data materialized for k-way merge. - let trie_data: Vec<_> = blocks.iter().map(|b| b.wait_cloned()).collect(); - - let merged_state = HashedPostStateSorted::merge_batch( - trie_data.iter().map(|d| d.hashed_state.as_ref()), - ); - let merged_nodes = - TrieUpdatesSorted::merge_batch(trie_data.iter().map(|d| d.trie_updates.as_ref())); - - TrieInputSorted { - state: Arc::new(merged_state), - nodes: Arc::new(merged_nodes), - prefix_sets: Default::default(), - } - } + TrieInputSorted { state, nodes, prefix_sets: Default::default() } } } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index ca8314099c..0fe7d85472 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -560,43 +560,12 @@ impl DatabaseProvider owned, - Err(arc) => (*arc).clone(), - } - } else if num_blocks < MERGE_BATCH_THRESHOLD { - // Small k: extend_ref with Arc::make_mut (copy-on-write). - // Blocks are oldest-to-newest, iterate forward so newest overrides. - let mut blocks_iter = blocks.iter(); - let mut result = blocks_iter.next().expect("non-empty").trie_updates(); - - for block in blocks_iter { - Arc::make_mut(&mut result) - .extend_ref_and_sort(block.trie_updates().as_ref()); - } - - match Arc::try_unwrap(result) { - Ok(owned) => owned, - Err(arc) => (*arc).clone(), - } - } else { - // Large k: k-way merge is faster (O(n log k)). - // Collect Arcs first to extend lifetime, then pass refs. - // Blocks are oldest-to-newest, merge_batch expects newest-to-oldest. - let arcs: Vec<_> = blocks.iter().rev().map(|b| b.trie_updates()).collect(); - TrieUpdatesSorted::merge_batch(arcs.iter().map(|arc| arc.as_ref())) - }; + // Blocks are oldest-to-newest, merge_batch expects newest-to-oldest. + let merged = + TrieUpdatesSorted::merge_batch(blocks.iter().rev().map(|b| b.trie_updates())); if !merged.is_empty() { self.write_trie_updates_sorted(&merged)?; diff --git a/crates/trie/common/src/hashed_state.rs b/crates/trie/common/src/hashed_state.rs index 3b232d4346..315bda49a4 100644 --- a/crates/trie/common/src/hashed_state.rs +++ b/crates/trie/common/src/hashed_state.rs @@ -638,31 +638,44 @@ impl HashedPostStateSorted { /// Batch-merge sorted hashed post states. Iterator yields **newest to oldest**. /// - /// Uses k-way merge for O(n log k) complexity and one-pass accumulation for storages. - pub fn merge_batch<'a>(states: impl IntoIterator) -> Self { - let states: Vec<_> = states.into_iter().collect(); - if states.is_empty() { - return Self::default(); + /// For small batches, uses `extend_ref_and_sort` loop. + /// For large batches, uses k-way merge for O(n log k) complexity. + pub fn merge_batch + From>(iter: impl IntoIterator) -> T { + const THRESHOLD: usize = 30; + + let items: alloc::vec::Vec<_> = iter.into_iter().collect(); + let k = items.len(); + + if k == 0 { + return Self::default().into(); + } + if k == 1 { + return items.into_iter().next().expect("k == 1"); } - let accounts = kway_merge_sorted(states.iter().map(|s| s.accounts.as_slice())); + if k < THRESHOLD { + // Small k: extend loop, oldest-to-newest so newer overrides older. + let mut iter = items.iter().rev(); + let mut acc = iter.next().expect("k > 0").as_ref().clone(); + for next in iter { + acc.extend_ref_and_sort(next.as_ref()); + } + return acc.into(); + } + + // Large k: k-way merge. + let accounts = kway_merge_sorted(items.iter().map(|i| i.as_ref().accounts.as_slice())); struct StorageAcc<'a> { - /// Account storage was cleared (e.g., SELFDESTRUCT). wiped: bool, - /// Stop collecting older slices after seeing a wipe. sealed: bool, - /// Storage slot slices to merge, ordered newest to oldest. slices: Vec<&'a [(B256, U256)]>, } let mut acc: B256Map> = B256Map::default(); - // Accumulate storage slices per address from newest to oldest state. - // Once we see a `wiped` flag, the account was cleared at that point, - // so older storage slots are irrelevant - we "seal" and stop collecting. - for state in &states { - for (addr, storage) in &state.storages { + for item in &items { + for (addr, storage) in &item.as_ref().storages { let entry = acc.entry(*addr).or_insert_with(|| StorageAcc { wiped: false, sealed: false, @@ -689,7 +702,7 @@ impl HashedPostStateSorted { }) .collect(); - Self { accounts, storages } + Self { accounts, storages }.into() } /// Clears all accounts and storage data. diff --git a/crates/trie/common/src/updates.rs b/crates/trie/common/src/updates.rs index 17f0d02b5e..08c62cee3f 100644 --- a/crates/trie/common/src/updates.rs +++ b/crates/trie/common/src/updates.rs @@ -629,48 +629,57 @@ impl TrieUpdatesSorted { /// Batch-merge sorted trie updates. Iterator yields **newest to oldest**. /// - /// This is more efficient than repeated `extend_ref` calls for large batches, - /// using k-way merge for O(n log k) complexity instead of O(n * k). - pub fn merge_batch<'a>(updates: impl IntoIterator) -> Self { - let updates: Vec<_> = updates.into_iter().collect(); - if updates.is_empty() { - return Self::default(); + /// For small batches, uses `extend_ref_and_sort` loop. + /// For large batches, uses k-way merge for O(n log k) complexity. + pub fn merge_batch + From>(iter: impl IntoIterator) -> T { + const THRESHOLD: usize = 30; + + let items: alloc::vec::Vec<_> = iter.into_iter().collect(); + let k = items.len(); + + if k == 0 { + return Self::default().into(); + } + if k == 1 { + return items.into_iter().next().expect("k == 1"); } - // Merge account nodes using k-way merge. Newest (index 0) takes precedence. - let account_nodes = kway_merge_sorted(updates.iter().map(|u| u.account_nodes.as_slice())); + if k < THRESHOLD { + // Small k: extend loop, oldest-to-newest so newer overrides older. + let mut iter = items.iter().rev(); + let mut acc = iter.next().expect("k > 0").as_ref().clone(); + for next in iter { + acc.extend_ref_and_sort(next.as_ref()); + } + return acc.into(); + } + + // Large k: k-way merge. + let account_nodes = + kway_merge_sorted(items.iter().map(|i| i.as_ref().account_nodes.as_slice())); - // Accumulator for collecting storage trie slices per address. - // We process updates newest-to-oldest and stop collecting for an address - // once we hit a "deleted" storage (sealed=true), since older data is irrelevant. struct StorageAcc<'a> { - /// Storage trie was deleted (account removed or cleared). is_deleted: bool, - /// Stop collecting older slices after seeing a deletion. sealed: bool, - /// Storage trie node slices to merge, ordered newest to oldest. slices: Vec<&'a [(Nibbles, Option)]>, } let mut acc: B256Map> = B256Map::default(); - // Collect storage slices per address, respecting deletion boundaries - for update in &updates { - for (addr, storage) in &update.storage_tries { + for item in &items { + for (addr, storage) in &item.as_ref().storage_tries { let entry = acc.entry(*addr).or_insert_with(|| StorageAcc { is_deleted: false, sealed: false, slices: Vec::new(), }); - // Skip if we already hit a deletion for this address (older data is irrelevant) if entry.sealed { continue; } entry.slices.push(storage.storage_nodes.as_slice()); - // If this storage was deleted, mark as deleted and seal to ignore older updates if storage.is_deleted { entry.is_deleted = true; entry.sealed = true; @@ -678,7 +687,6 @@ impl TrieUpdatesSorted { } } - // Merge each address's storage slices using k-way merge let storage_tries = acc .into_iter() .map(|(addr, entry)| { @@ -687,7 +695,7 @@ impl TrieUpdatesSorted { }) .collect(); - Self { account_nodes, storage_tries } + Self { account_nodes, storage_tries }.into() } }