perf(storage): batch trie updates across blocks in save_blocks (#21142)

Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: YK <chiayongkang@hotmail.com>
This commit is contained in:
Georgios Konstantopoulos
2026-01-17 07:15:40 +00:00
committed by GitHub
parent 6bf43ab24a
commit c11c13000f
2 changed files with 62 additions and 24 deletions

View File

@@ -10,11 +10,6 @@ use reth_trie::{updates::TrieUpdatesSorted, HashedPostStateSorted, TrieInputSort
use std::sync::{Arc, OnceLock};
use tracing::{debug, trace};
/// Threshold for switching from `extend_ref` loop to `merge_batch`.
///
/// Benchmarked crossover: `extend_ref` wins up to ~64 blocks, `merge_batch` wins beyond.
const MERGE_BATCH_THRESHOLD: usize = 64;
/// Inputs captured for lazy overlay computation.
#[derive(Clone)]
struct LazyOverlayInputs {
@@ -128,44 +123,46 @@ impl LazyOverlay {
/// Merge all blocks' trie data into a single [`TrieInputSorted`].
///
/// Blocks are ordered newest to oldest. We iterate oldest to newest so that
/// newer values override older ones.
/// Blocks are ordered newest to oldest. Uses hybrid merge algorithm that
/// switches between `extend_ref` (small batches) and k-way merge (large batches).
fn merge_blocks(blocks: &[DeferredTrieData]) -> TrieInputSorted {
const MERGE_BATCH_THRESHOLD: usize = 64;
if blocks.is_empty() {
return TrieInputSorted::default();
}
// Single block: use its data directly
// Single block: use its data directly (no allocation)
if blocks.len() == 1 {
let data = blocks[0].wait_cloned();
return TrieInputSorted {
state: Arc::clone(&data.hashed_state),
nodes: Arc::clone(&data.trie_updates),
state: data.hashed_state,
nodes: data.trie_updates,
prefix_sets: Default::default(),
};
}
if blocks.len() < MERGE_BATCH_THRESHOLD {
// Small k: extend_ref loop is faster
// Iterate oldest->newest so newer values override older ones
// Small k: extend_ref loop with Arc::make_mut is faster.
// Uses copy-on-write - only clones inner data if Arc has multiple refs.
// Iterate oldest->newest so newer values override older ones.
let mut blocks_iter = blocks.iter().rev();
let first = blocks_iter.next().expect("blocks is non-empty");
let data = first.wait_cloned();
let mut state = Arc::clone(&data.hashed_state);
let mut nodes = Arc::clone(&data.trie_updates);
let state_mut = Arc::make_mut(&mut state);
let nodes_mut = Arc::make_mut(&mut nodes);
let mut state = data.hashed_state;
let mut nodes = data.trie_updates;
for block in blocks_iter {
let data = block.wait_cloned();
state_mut.extend_ref(data.hashed_state.as_ref());
nodes_mut.extend_ref(data.trie_updates.as_ref());
let block_data = block.wait_cloned();
Arc::make_mut(&mut state).extend_ref(block_data.hashed_state.as_ref());
Arc::make_mut(&mut nodes).extend_ref(block_data.trie_updates.as_ref());
}
TrieInputSorted { state, nodes, prefix_sets: Default::default() }
} else {
// Large k: merge_batch is faster (O(n log k) via k-way merge)
// Large k: k-way merge is faster (O(n log k)).
// Collect is unavoidable here - we need all data materialized for k-way merge.
let trie_data: Vec<_> = blocks.iter().map(|b| b.wait_cloned()).collect();
let merged_state = HashedPostStateSorted::merge_batch(

View File

@@ -556,13 +556,54 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
let start = Instant::now();
self.write_hashed_state(&trie_data.hashed_state)?;
timings.write_hashed_state += start.elapsed();
let start = Instant::now();
self.write_trie_updates_sorted(&trie_data.trie_updates)?;
timings.write_trie_updates += start.elapsed();
}
}
// Write all trie updates in a single batch.
// This reduces cursor open/close overhead from N calls to 1.
// Uses hybrid algorithm: extend_ref for small batches, k-way merge for large.
if save_mode.with_state() {
const MERGE_BATCH_THRESHOLD: usize = 30;
let start = Instant::now();
let num_blocks = blocks.len();
let merged = if num_blocks == 0 {
TrieUpdatesSorted::default()
} else if num_blocks == 1 {
// Single block: use directly (Arc::try_unwrap avoids clone if refcount is 1)
match Arc::try_unwrap(blocks[0].trie_updates()) {
Ok(owned) => owned,
Err(arc) => (*arc).clone(),
}
} else if num_blocks < MERGE_BATCH_THRESHOLD {
// Small k: extend_ref with Arc::make_mut (copy-on-write).
// Blocks are oldest-to-newest, iterate forward so newest overrides.
let mut blocks_iter = blocks.iter();
let mut result = blocks_iter.next().expect("non-empty").trie_updates();
for block in blocks_iter {
Arc::make_mut(&mut result).extend_ref(block.trie_updates().as_ref());
}
match Arc::try_unwrap(result) {
Ok(owned) => owned,
Err(arc) => (*arc).clone(),
}
} else {
// Large k: k-way merge is faster (O(n log k)).
// Collect Arcs first to extend lifetime, then pass refs.
// Blocks are oldest-to-newest, merge_batch expects newest-to-oldest.
let arcs: Vec<_> = blocks.iter().rev().map(|b| b.trie_updates()).collect();
TrieUpdatesSorted::merge_batch(arcs.iter().map(|arc| arc.as_ref()))
};
if !merged.is_empty() {
self.write_trie_updates_sorted(&merged)?;
}
timings.write_trie_updates += start.elapsed();
}
// Full mode: update history indices
if save_mode.with_state() {
let start = Instant::now();