Compare commits

...

9 Commits

Author SHA1 Message Date
joshieDo
d3950bc83f rm benches 2026-01-16 21:05:22 +00:00
joshieDo
3adbfe5b3c refactor(provider): use k-way merge only for hashed state, revert plain state
Benchmarks showed the parallel sort for plain state caused regression
on small batches (1-10 blocks). This change:

- Reverts plain state to per-block `write_state` (original behavior)
- Keeps k-way merge for hashed state only (efficient since pre-sorted)
- Adds conditional: single block uses sequential write, multiple blocks
  use k-way merge

Benchmark results show 2-10% improvement across all batch sizes.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-16 18:17:10 +00:00
Georgios Konstantopoulos
23e7012049 bench(state_merge): increase data sizes for heavy mainnet simulation
Accounts: 2000-5000 per block (up to 500k total)
Storage: 100-200 contracts × 30-50 slots per block (up to 1M slots)

Results with heavy workloads show significant parallel sort wins:
- 300k slots: 26ms seq → 17ms par (35% faster)
- 500k slots: 45ms seq → 28ms par (38% faster)
- 1M slots: 124ms seq → 87ms par (30% faster)
2026-01-16 11:34:05 +00:00
Georgios Konstantopoulos
a41d853a84 bench(state_merge): use realistic mainnet data sizes
Updated benchmark parameters to simulate real mainnet activity:
- Accounts: 500-1000 per block (vs 100 before)
- Storage: 50-100 contracts × 20-30 slots per block

Results show parallel sort provides significant speedups:
- Account merge: ~12% faster at 100k accounts
- Storage merge: ~32-37% faster at 50k-300k slots

This validates the switch from k-way merge to parallel sort.
2026-01-16 11:24:20 +00:00
Georgios Konstantopoulos
435e07f67b perf(persistence): use parallel sort for batched state writes
Replaces k-way merge with flatten + parallel sort (rayon) for
write_state_changes_merged. Benchmarks show this approach has lower
overhead for typical block counts (10-100 blocks) due to:

- Better cache locality from sequential memory access
- Rayon's parallel sort scales with CPU cores
- Avoids BinaryHeap per-element overhead

The k-way merge approach was ~2x slower than flatten+sort for storage
writes due to heap operation overhead outweighing the theoretical
O(n log k) vs O(n log n) advantage.

Also adds microbenchmarks in benches/state_merge.rs to validate
performance of different merge strategies.
2026-01-16 11:10:27 +00:00
joshieDo
76fe19f11a Merge branch 'main' into joshie/batch-hashed-writes 2026-01-16 10:02:13 +00:00
joshieDo
d81e9c9567 perf(provider): batch plain state writes in save_blocks
Add write_state_changes_merged to batch PlainAccountState, PlainStorageState,
and Bytecodes writes across multiple blocks using flatten-sort-dedupe approach.

This complements the existing write_hashed_state_merged optimization by applying
the same sequential I/O pattern to plain state tables:

- Accounts: flattened, sorted by address, latest block wins on duplicates
- Bytecodes: flattened, sorted by hash, latest block wins
- Storage: handles wipe_storage flag (latest wipe invalidates earlier blocks),
  then flattens slots, sorts by (address, slot), writes only latest value

Also extracts write_receipts from write_state so receipts can be written
per-block while state changes are batched.

Expected impact: reduced I/O contention and improved write locality during
block persistence.

Amp-Thread-ID: https://ampcode.com/threads/T-019bc2f3-a568-73fe-8a6f-347d6bc9f386
Co-authored-by: Amp <amp@ampcode.com>
2026-01-15 18:53:10 +00:00
Georgios Konstantopoulos
f3fe4606ad perf(provider): use k-way merge for batched hashed state writes
Implements a k-way merge algorithm for writing hashed state from multiple
blocks in save_blocks. This replaces the previous extend_ref approach
which cloned data into a combined vec.

Key improvements:
- No cloning: uses Arc references to per-block hashed state
- Deduplication: same key across blocks is written only once (last wins)
- Sequential I/O: writes in globally sorted key order across all blocks
- Streaming: uses BinaryHeap for O(n log k) merge without materializing

For hashed accounts:
- Merges sorted account lists from all blocks
- Deduplicates by hashed address, taking latest block's value
- Streams directly to MDBX cursor

For hashed storage:
- Collects all (address, block_idx, storage) tuples
- Sorts by address, then groups for per-address merge
- Handles storage wipes correctly (latest wipe invalidates earlier)
- K-way merge within each address's storage slots

This reduces both allocation overhead and redundant database writes,
addressing the 21% pagecache_get_page and 37.5% page fault overhead
observed in Persistence thread profiling.
2026-01-15 18:23:59 +00:00
Georgios Konstantopoulos
cf8f79c863 perf(provider): batch hashed state writes across blocks
Batches write_hashed_state calls across all blocks in save_blocks to
improve I/O sequentiality.

Previously, each block's hashed state was written separately, causing
random I/O as keys from different blocks interleave in the B-tree.
For example, block N+1's hashed addresses might sort before block N's,
causing the cursor to jump backwards.

This change collects all blocks' hashed state into a single sorted
collection using extend_ref before writing, enabling sequential I/O.
This follows the same pattern already used for transaction hash writes.

Expected impact: Reduced page faults in the Persistence thread where
profiling showed 21% of time in pagecache_get_page/xas_load and 37.5%
in page fault handling.
2026-01-15 18:07:43 +00:00

View File

@@ -28,7 +28,7 @@ use alloy_eips::BlockHashOrNumber;
use alloy_primitives::{
keccak256,
map::{hash_map, HashMap, HashSet},
Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
};
use itertools::Itertools;
use parking_lot::RwLock;
@@ -67,7 +67,8 @@ use reth_trie::{
changesets::storage_trie_wiped_changeset_iter,
trie_cursor::{InMemoryTrieCursor, TrieCursor, TrieCursorIter, TrieStorageCursor},
updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
HashedPostStateSorted, StoredNibbles, StoredNibblesSubKey, TrieChangeSetsEntry,
HashedPostStateSorted, HashedStorageSorted, StoredNibbles, StoredNibblesSubKey,
TrieChangeSetsEntry,
};
use reth_trie_db::{ChangesetCache, DatabaseAccountTrieCursor, DatabaseStorageTrieCursor};
use revm_database::states::{
@@ -520,6 +521,10 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
);
}
// Collect hashed state for batched k-way merge writes
let mut hashed_states: Vec<Arc<HashedPostStateSorted>> =
Vec::with_capacity(blocks.len());
for (i, block) in blocks.iter().enumerate() {
let recovered_block = block.recovered_block();
@@ -546,17 +551,29 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
let trie_data = block.trie_data();
// insert hashes and intermediate merkle nodes
let start = Instant::now();
self.write_hashed_state(&trie_data.hashed_state)?;
timings.write_hashed_state += start.elapsed();
// Trie changesets must be written per-block (keyed by block number)
let start = Instant::now();
self.write_trie_updates_sorted(&trie_data.trie_updates)?;
timings.write_trie_updates += start.elapsed();
// Collect hashed state reference for batched write (no clone)
hashed_states.push(Arc::clone(&trie_data.hashed_state));
}
}
// Write hashed state - use k-way merge only for multiple blocks
if !hashed_states.is_empty() {
let start = Instant::now();
if hashed_states.len() == 1 {
// Single block: use sequential write (no merge overhead)
self.write_hashed_state(&hashed_states[0])?;
} else {
// Multiple blocks: use k-way merge for sorted, deduped, sequential I/O
self.write_hashed_state_merged(&hashed_states)?;
}
timings.write_hashed_state = start.elapsed();
}
// Full mode: update history indices
if save_mode.with_state() {
let start = Instant::now();
@@ -730,6 +747,356 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
Ok(())
}
/// Writes hashed state from multiple blocks using k-way merge for sequential I/O.
///
/// This method merges sorted hashed state from multiple blocks and writes them in globally
/// sorted key order, deduplicating keys (last block wins). This provides better I/O locality
/// than writing each block's state separately, as it avoids cursor jumping back and forth
/// in the B-tree.
#[instrument(level = "debug", target = "providers::db", skip_all)]
pub fn write_hashed_state_merged(
&self,
hashed_states: &[Arc<HashedPostStateSorted>],
) -> ProviderResult<()> {
use std::collections::BinaryHeap;
// === Write hashed accounts using k-way merge ===
{
// Entry for the min-heap: (key, block_index, value)
// We use Reverse to make it a min-heap by key, then max by block_index for dedup
#[derive(Eq, PartialEq)]
struct AccountEntry {
key: B256,
block_idx: usize,
value: Option<Account>,
iter_idx: usize, // which iterator this came from
}
impl Ord for AccountEntry {
fn cmp(&self, other: &Self) -> Ordering {
// Min-heap by key, then max by block_idx (so latest block wins on ties)
other.key.cmp(&self.key).then_with(|| self.block_idx.cmp(&other.block_idx))
}
}
impl PartialOrd for AccountEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
// Initialize iterators and heap
let mut iters: Vec<_> = hashed_states
.iter()
.enumerate()
.map(|(block_idx, state)| (block_idx, state.accounts().iter().peekable()))
.collect();
let mut heap = BinaryHeap::new();
// Seed heap with first element from each iterator
for (block_idx, iter) in &mut iters {
if let Some(&(key, value)) = iter.next() {
heap.push(AccountEntry {
key,
block_idx: *block_idx,
value,
iter_idx: *block_idx,
});
}
}
let mut hashed_accounts_cursor =
self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
let mut last_key: Option<B256> = None;
while let Some(entry) = heap.pop() {
// Refill heap from this iterator
if let Some(&(next_key, next_value)) = iters[entry.iter_idx].1.next() {
heap.push(AccountEntry {
key: next_key,
block_idx: entry.block_idx,
value: next_value,
iter_idx: entry.iter_idx,
});
}
// Skip duplicate keys (we already processed this key from a later block)
if last_key == Some(entry.key) {
continue;
}
// Drain any remaining entries with the same key (take latest block)
let mut final_value = entry.value;
let mut final_block_idx = entry.block_idx;
while let Some(next) = heap.peek() {
if next.key != entry.key {
break;
}
let next = heap.pop().unwrap();
// Refill heap
if let Some(&(next_key, next_value)) = iters[next.iter_idx].1.next() {
heap.push(AccountEntry {
key: next_key,
block_idx: next.block_idx,
value: next_value,
iter_idx: next.iter_idx,
});
}
// Take value from later block
if next.block_idx > final_block_idx {
final_value = next.value;
final_block_idx = next.block_idx;
}
}
last_key = Some(entry.key);
// Write to database
if let Some(account) = final_value {
hashed_accounts_cursor.upsert(entry.key, &account)?;
} else if hashed_accounts_cursor.seek_exact(entry.key)?.is_some() {
hashed_accounts_cursor.delete_current()?;
}
}
}
// === Write hashed storage using k-way merge ===
{
// First, collect all (address, block_idx, storage) tuples and sort by address
let mut all_storages: Vec<(B256, usize, &HashedStorageSorted)> = Vec::new();
for (block_idx, state) in hashed_states.iter().enumerate() {
for (addr, storage) in state.account_storages() {
all_storages.push((*addr, block_idx, storage));
}
}
all_storages.sort_unstable_by_key(|(addr, _, _)| *addr);
let mut hashed_storage_cursor =
self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
// Group by address and process each address's storage with merge
let mut i = 0;
while i < all_storages.len() {
let current_addr = all_storages[i].0;
// Collect all storages for this address
let mut addr_storages: Vec<(usize, &HashedStorageSorted)> = Vec::new();
while i < all_storages.len() && all_storages[i].0 == current_addr {
addr_storages.push((all_storages[i].1, all_storages[i].2));
i += 1;
}
// Check if any block wiped this storage (latest wipe wins)
let mut is_wiped = false;
let mut wipe_block_idx = 0;
for (block_idx, storage) in &addr_storages {
if storage.is_wiped() && *block_idx >= wipe_block_idx {
is_wiped = true;
wipe_block_idx = *block_idx;
}
}
if is_wiped {
if hashed_storage_cursor.seek_exact(current_addr)?.is_some() {
hashed_storage_cursor.delete_current_duplicates()?;
}
// Only include slots from blocks >= wipe_block_idx
addr_storages.retain(|(block_idx, _)| *block_idx >= wipe_block_idx);
}
// K-way merge for storage slots
#[derive(Eq, PartialEq)]
struct SlotEntry<'a> {
key: B256,
value: U256,
block_idx: usize,
iter_idx: usize,
remaining: &'a [(B256, U256)],
}
impl Ord for SlotEntry<'_> {
fn cmp(&self, other: &Self) -> Ordering {
other.key.cmp(&self.key).then_with(|| self.block_idx.cmp(&other.block_idx))
}
}
impl PartialOrd for SlotEntry<'_> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
let mut heap: BinaryHeap<SlotEntry<'_>> = BinaryHeap::new();
// Initialize heap with first slot from each storage
for (idx, (block_idx, storage)) in addr_storages.iter().enumerate() {
let slots = storage.storage_slots_ref();
if let Some((&(key, value), remaining)) = slots.split_first() {
heap.push(SlotEntry {
key,
value,
block_idx: *block_idx,
iter_idx: idx,
remaining,
});
}
}
let mut last_slot: Option<B256> = None;
while let Some(entry) = heap.pop() {
// Refill heap
if let Some((&(next_key, next_value), next_remaining)) =
entry.remaining.split_first()
{
heap.push(SlotEntry {
key: next_key,
value: next_value,
block_idx: entry.block_idx,
iter_idx: entry.iter_idx,
remaining: next_remaining,
});
}
// Skip if we already processed this slot
if last_slot == Some(entry.key) {
continue;
}
// Drain duplicates, taking latest block's value
let mut final_value = entry.value;
let mut final_block_idx = entry.block_idx;
while let Some(next) = heap.peek() {
if next.key != entry.key {
break;
}
let next = heap.pop().unwrap();
if let Some((&(next_key, next_value), next_remaining)) =
next.remaining.split_first()
{
heap.push(SlotEntry {
key: next_key,
value: next_value,
block_idx: next.block_idx,
iter_idx: next.iter_idx,
remaining: next_remaining,
});
}
if next.block_idx > final_block_idx {
final_value = next.value;
final_block_idx = next.block_idx;
}
}
last_slot = Some(entry.key);
// Write to database
let storage_entry = StorageEntry { key: entry.key, value: final_value };
if let Some(db_entry) =
hashed_storage_cursor.seek_by_key_subkey(current_addr, storage_entry.key)? &&
db_entry.key == storage_entry.key
{
hashed_storage_cursor.delete_current()?;
}
if !storage_entry.value.is_zero() {
hashed_storage_cursor.upsert(current_addr, &storage_entry)?;
}
}
}
}
Ok(())
}
/// Write receipts for a single block's execution outcome.
///
/// This is used by `save_blocks` to write receipts per-block while batching other state writes.
pub fn write_receipts(
&self,
execution_outcome: &ExecutionOutcome<ReceiptTy<N>>,
config: StateWriteConfig,
) -> ProviderResult<()> {
if !config.write_receipts {
return Ok(());
}
let first_block = execution_outcome.first_block();
let block_count = execution_outcome.len() as u64;
let last_block = execution_outcome.last_block();
let block_range = first_block..=last_block;
let tip = self.last_block_number()?.max(last_block);
// Fetch the first transaction number for each block in the range
let block_indices: Vec<_> = self
.block_body_indices_range(block_range)?
.into_iter()
.map(|b| b.first_tx_num)
.collect();
// Ensure all expected blocks are present.
if block_indices.len() < block_count as usize {
let missing_blocks = block_count - block_indices.len() as u64;
return Err(ProviderError::BlockBodyIndicesNotFound(
last_block.saturating_sub(missing_blocks - 1),
));
}
let mut receipts_writer = EitherWriter::new_receipts(self, first_block)?;
let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
let prunable_receipts = (EitherWriter::receipts_destination(self).is_database() ||
self.static_file_provider()
.get_highest_static_file_tx(StaticFileSegment::Receipts)
.is_none()) &&
PruneMode::Distance(self.minimum_pruning_distance).should_prune(first_block, tip);
let mut allowed_addresses: HashSet<Address, _> = HashSet::new();
for (_, addresses) in contract_log_pruner.range(..first_block) {
allowed_addresses.extend(addresses.iter().copied());
}
for (idx, (receipts, first_tx_index)) in
execution_outcome.receipts.iter().zip(block_indices).enumerate()
{
let block_number = first_block + idx as u64;
receipts_writer.increment_block(block_number)?;
if prunable_receipts &&
self.prune_modes
.receipts
.is_some_and(|mode| mode.should_prune(block_number, tip))
{
continue;
}
if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
allowed_addresses.extend(new_addresses.iter().copied());
}
for (idx, receipt) in receipts.iter().enumerate() {
let receipt_idx = first_tx_index + idx as u64;
if prunable_receipts &&
has_contract_log_filter &&
!receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
{
continue;
}
receipts_writer.append_receipt(receipt_idx, receipt)?;
}
}
Ok(())
}
}
impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {