Compare commits

...

4 Commits

Author SHA1 Message Date
yongkangc
a8be40c029 perf(trie): optimize extend_sorted_vec with mem::take and fast paths
- Use mem::take to move ownership, avoiding clones of target elements
- Add fast path for non-overlapping ranges (just append)
- Use extend_from_slice for empty target case
- Reuse key from target on equal keys, only clone value
2026-01-15 19:32:47 +00:00
yongkangc
9c07dca43b perf(trie): fix extend_sorted_vec O(n log n) → O(n+m) merge
Replace the previous algorithm that appended elements then re-sorted
with a proper two-pointer merge that maintains O(n+m) complexity.

The old implementation would append new elements and call sort_by(),
resulting in O(n log n) complexity per merge. The new implementation
uses a classic merge algorithm that processes both sorted inputs
in a single pass.
2026-01-15 19:32:47 +00:00
joshieDo
b9e15dbd30 feat: add rocksdb to save_blocks (#21003)
Co-authored-by: Sergei Shulepov <s.pepyakin@gmail.com>
Co-authored-by: Sergei Shulepov <pep@tempo.xyz>
Co-authored-by: yongkangc <chiayongkang@hotmail.com>
2026-01-15 19:32:47 +00:00
yongkangc
8c07ee2be4 perf(trie): implement 3-way adaptive merge strategy for merge_batch
- Add prefer_sorted_merge() helper with thresholds: KWAY_MIN_SOURCES=30, PAIRWISE_MIN_AVG_ITEMS=2000
- k >= 30 sources: use k-way merge (avoids O(k) copying amplification)
- k < 30, avg >= 2000 items/source: use pairwise extend_ref (2-3x faster, better cache locality)
- Otherwise: use HashMap merge then sort (lower overhead for small data)

Benchmarks show extend_ref beats kway_merge by 1.3-3.2x for k < 30 sources.
2026-01-15 19:02:15 +00:00
11 changed files with 643 additions and 278 deletions

View File

@@ -2,9 +2,8 @@
/// Threshold for switching from `extend_ref` loop to `merge_batch` in `merge_overlay_trie_input`.
///
/// Benchmarked crossover: `extend_ref` wins up to ~64 blocks, `merge_batch` wins beyond.
/// Using 64 as threshold since they're roughly equal there.
const MERGE_BATCH_THRESHOLD: usize = 64;
/// Benchmarked crossover: `extend_ref` wins up to ~30 blocks, `merge_batch` wins beyond.
const MERGE_BATCH_THRESHOLD: usize = 30;
use crate::tree::{
cached_state::CachedStateProvider,

View File

@@ -101,4 +101,11 @@ impl StorageSettings {
self.account_changesets_in_static_files = value;
self
}
/// Returns `true` if any tables are configured to be stored in `RocksDB`.
pub const fn any_in_rocksdb(&self) -> bool {
self.transaction_hash_numbers_in_rocksdb ||
self.account_history_in_rocksdb ||
self.storages_history_in_rocksdb
}
}

View File

@@ -4,7 +4,7 @@ use crate::{
},
providers::{
database::{chain::ChainStorage, metrics},
rocksdb::RocksDBProvider,
rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
static_file::{StaticFileWriteCtx, StaticFileWriter},
NodeTypesForProvider, StaticFileProvider,
},
@@ -188,8 +188,8 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
/// `RocksDB` provider
rocksdb_provider: RocksDBProvider,
/// Pending `RocksDB` batches to be committed at provider commit time.
#[cfg(all(unix, feature = "rocksdb"))]
pending_rocksdb_batches: parking_lot::Mutex<Vec<rocksdb::WriteBatchWithTransaction<true>>>,
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
pending_rocksdb_batches: PendingRocksDBBatches,
/// Minimum distance from tip required for pruning
minimum_pruning_distance: u64,
/// Database provider metrics
@@ -205,10 +205,10 @@ impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
.field("prune_modes", &self.prune_modes)
.field("storage", &self.storage)
.field("storage_settings", &self.storage_settings)
.field("rocksdb_provider", &self.rocksdb_provider);
#[cfg(all(unix, feature = "rocksdb"))]
s.field("pending_rocksdb_batches", &"<pending batches>");
s.field("minimum_pruning_distance", &self.minimum_pruning_distance).finish()
.field("rocksdb_provider", &self.rocksdb_provider)
.field("pending_rocksdb_batches", &"<pending batches>")
.field("minimum_pruning_distance", &self.minimum_pruning_distance)
.finish()
}
}
@@ -336,8 +336,7 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
storage,
storage_settings,
rocksdb_provider,
#[cfg(all(unix, feature = "rocksdb"))]
pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
pending_rocksdb_batches: Default::default(),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
@@ -403,6 +402,17 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
})
}
/// Creates the context for `RocksDB` writes.
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
fn rocksdb_write_ctx(&self, first_block: BlockNumber) -> RocksDBWriteCtx {
RocksDBWriteCtx {
first_block_number: first_block,
prune_tx_lookup: self.prune_modes.transaction_lookup,
storage_settings: self.cached_storage_settings(),
pending_batches: self.pending_rocksdb_batches.clone(),
}
}
/// Writes executed blocks and state to storage.
///
/// This method parallelizes static file (SF) writes with MDBX writes.
@@ -452,6 +462,10 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
// avoid capturing &self.tx in scope below.
let sf_provider = &self.static_file_provider;
let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_provider = self.rocksdb_provider.clone();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_ctx = self.rocksdb_write_ctx(first_number);
thread::scope(|s| {
// SF writes
@@ -461,6 +475,16 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
Ok::<_, ProviderError>(start.elapsed())
});
// RocksDB writes
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_handle = rocksdb_ctx.storage_settings.any_in_rocksdb().then(|| {
s.spawn(|| {
let start = Instant::now();
rocksdb_provider.write_blocks_data(&blocks, &tx_nums, rocksdb_ctx)?;
Ok::<_, ProviderError>(start.elapsed())
})
});
// MDBX writes
let mdbx_start = Instant::now();
@@ -557,6 +581,12 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
.join()
.map_err(|_| StaticFileWriterError::ThreadPanic("static file"))??;
// Wait for RocksDB thread
#[cfg(all(unix, feature = "rocksdb"))]
if let Some(handle) = rocksdb_handle {
timings.rocksdb = handle.join().expect("RocksDB thread panicked")?;
}
timings.total = total_start.elapsed();
self.metrics.record_save_blocks(&timings);
@@ -821,8 +851,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
storage,
storage_settings,
rocksdb_provider,
#[cfg(all(unix, feature = "rocksdb"))]
pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
pending_rocksdb_batches: Default::default(),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
@@ -3167,14 +3196,13 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvi
#[instrument(level = "debug", target = "providers::db", skip_all)]
fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
// account history stage
{
let storage_settings = self.cached_storage_settings();
if !storage_settings.account_history_in_rocksdb {
let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
self.insert_account_history_index(indices)?;
}
// storage history stage
{
if !storage_settings.storages_history_in_rocksdb {
let indices = self.changed_storages_and_blocks_with_range(range)?;
self.insert_storage_history_index(indices)?;
}

View File

@@ -4,4 +4,5 @@ mod invariants;
mod metrics;
mod provider;
pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx};
pub use provider::{RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksTx};

View File

@@ -1,11 +1,16 @@
use super::metrics::{RocksDBMetrics, RocksDBOperation};
use crate::providers::{needs_prev_shard_check, HistoryInfo};
use alloy_primitives::{Address, BlockNumber, B256};
use alloy_consensus::transaction::TxHashRef;
use alloy_primitives::{Address, BlockNumber, TxNumber, B256};
use parking_lot::Mutex;
use reth_chain_state::ExecutedBlock;
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, ShardedKey},
models::{storage_sharded_key::StorageShardedKey, ShardedKey, StorageSettings},
table::{Compress, Decode, Decompress, Encode, Table},
tables, BlockNumberList, DatabaseError,
};
use reth_primitives_traits::BlockBody as _;
use reth_prune_types::PruneMode;
use reth_storage_errors::{
db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
provider::{ProviderError, ProviderResult},
@@ -16,11 +21,41 @@ use rocksdb::{
OptimisticTransactionOptions, Options, Transaction, WriteBatchWithTransaction, WriteOptions,
};
use std::{
collections::BTreeMap,
fmt,
path::{Path, PathBuf},
sync::Arc,
thread,
time::Instant,
};
use tracing::instrument;
/// Pending `RocksDB` batches type alias.
pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<WriteBatchWithTransaction<true>>>>;
/// Context for `RocksDB` block writes.
#[derive(Clone)]
pub(crate) struct RocksDBWriteCtx {
/// The first block number being written.
pub first_block_number: BlockNumber,
/// The prune mode for transaction lookup, if any.
pub prune_tx_lookup: Option<PruneMode>,
/// Storage settings determining what goes to `RocksDB`.
pub storage_settings: StorageSettings,
/// Pending batches to push to after writing.
pub pending_batches: PendingRocksDBBatches,
}
impl fmt::Debug for RocksDBWriteCtx {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RocksDBWriteCtx")
.field("first_block_number", &self.first_block_number)
.field("prune_tx_lookup", &self.prune_tx_lookup)
.field("storage_settings", &self.storage_settings)
.field("pending_batches", &"<pending batches>")
.finish()
}
}
/// Default cache size for `RocksDB` block cache (128 MB).
const DEFAULT_CACHE_SIZE: usize = 128 << 20;
@@ -474,6 +509,125 @@ impl RocksDBProvider {
}))
})
}
/// Writes all `RocksDB` data for multiple blocks in parallel.
///
/// This handles transaction hash numbers, account history, and storage history based on
/// the provided storage settings. Each operation runs in parallel with its own batch,
/// pushing to `ctx.pending_batches` for later commit.
#[instrument(level = "debug", target = "providers::db", skip_all)]
pub(crate) fn write_blocks_data<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
tx_nums: &[TxNumber],
ctx: RocksDBWriteCtx,
) -> ProviderResult<()> {
if !ctx.storage_settings.any_in_rocksdb() {
return Ok(());
}
thread::scope(|s| {
let handles: Vec<_> = [
(ctx.storage_settings.transaction_hash_numbers_in_rocksdb &&
ctx.prune_tx_lookup.is_none_or(|m| !m.is_full()))
.then(|| s.spawn(|| self.write_tx_hash_numbers(blocks, tx_nums, &ctx))),
ctx.storage_settings
.account_history_in_rocksdb
.then(|| s.spawn(|| self.write_account_history(blocks, &ctx))),
ctx.storage_settings
.storages_history_in_rocksdb
.then(|| s.spawn(|| self.write_storage_history(blocks, &ctx))),
]
.into_iter()
.enumerate()
.filter_map(|(i, h)| h.map(|h| (i, h)))
.collect();
for (i, handle) in handles {
handle.join().map_err(|_| {
ProviderError::Database(DatabaseError::Other(format!(
"rocksdb write thread {i} panicked"
)))
})??;
}
Ok(())
})
}
/// Writes transaction hash to number mappings for the given blocks.
#[instrument(level = "debug", target = "providers::db", skip_all)]
fn write_tx_hash_numbers<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
tx_nums: &[TxNumber],
ctx: &RocksDBWriteCtx,
) -> ProviderResult<()> {
let mut batch = self.batch();
for (block, &first_tx_num) in blocks.iter().zip(tx_nums) {
let body = block.recovered_block().body();
let mut tx_num = first_tx_num;
for transaction in body.transactions_iter() {
batch.put::<tables::TransactionHashNumbers>(*transaction.tx_hash(), &tx_num)?;
tx_num += 1;
}
}
ctx.pending_batches.lock().push(batch.into_inner());
Ok(())
}
/// Writes account history indices for the given blocks.
#[instrument(level = "debug", target = "providers::db", skip_all)]
fn write_account_history<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
ctx: &RocksDBWriteCtx,
) -> ProviderResult<()> {
let mut batch = self.batch();
let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
for (block_idx, block) in blocks.iter().enumerate() {
let block_number = ctx.first_block_number + block_idx as u64;
let bundle = &block.execution_outcome().bundle;
for &address in bundle.state().keys() {
account_history.entry(address).or_default().push(block_number);
}
}
for (address, blocks) in account_history {
let key = ShardedKey::new(address, u64::MAX);
let value = BlockNumberList::new_pre_sorted(blocks);
batch.put::<tables::AccountsHistory>(key, &value)?;
}
ctx.pending_batches.lock().push(batch.into_inner());
Ok(())
}
/// Writes storage history indices for the given blocks.
#[instrument(level = "debug", target = "providers::db", skip_all)]
fn write_storage_history<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
ctx: &RocksDBWriteCtx,
) -> ProviderResult<()> {
let mut batch = self.batch();
let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
for (block_idx, block) in blocks.iter().enumerate() {
let block_number = ctx.first_block_number + block_idx as u64;
let bundle = &block.execution_outcome().bundle;
for (&address, account) in bundle.state() {
for &slot in account.storage.keys() {
let key = B256::new(slot.to_be_bytes());
storage_history.entry((address, key)).or_default().push(block_number);
}
}
}
for ((address, slot), blocks) in storage_history {
let key = StorageShardedKey::new(address, slot, u64::MAX);
let value = BlockNumberList::new_pre_sorted(blocks);
batch.put::<tables::StoragesHistory>(key, &value)?;
}
ctx.pending_batches.lock().push(batch.into_inner());
Ok(())
}
}
/// Handle for building a batch of operations atomically.

View File

@@ -2,28 +2,42 @@
//!
//! This module provides placeholder types that allow the code to compile when `RocksDB` is not
//! available (either on non-Unix platforms or when the `rocksdb` feature is not enabled).
//! Operations will produce errors if actually attempted.
//! All method calls are cfg-guarded in the calling code, so only type definitions are needed here.
use reth_db_api::table::{Encode, Table};
use reth_storage_errors::{
db::LogLevel,
provider::{ProviderError::UnsupportedProvider, ProviderResult},
};
use std::path::Path;
use alloy_primitives::BlockNumber;
use parking_lot::Mutex;
use reth_db_api::models::StorageSettings;
use reth_prune_types::PruneMode;
use reth_storage_errors::{db::LogLevel, provider::ProviderResult};
use std::{path::Path, sync::Arc};
/// Pending `RocksDB` batches type alias (stub - uses unit type).
pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<()>>>;
/// Context for `RocksDB` block writes (stub).
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub(crate) struct RocksDBWriteCtx {
/// The first block number being written.
pub first_block_number: BlockNumber,
/// The prune mode for transaction lookup, if any.
pub prune_tx_lookup: Option<PruneMode>,
/// Storage settings determining what goes to `RocksDB`.
pub storage_settings: StorageSettings,
/// Pending batches (stub - unused).
pub pending_batches: PendingRocksDBBatches,
}
/// A stub `RocksDB` provider.
///
/// This type exists to allow code to compile when `RocksDB` is not available (either on non-Unix
/// platforms or when the `rocksdb` feature is not enabled). When using this stub, the
/// `transaction_hash_numbers_in_rocksdb` flag should be set to `false` to ensure all operations
/// route to MDBX instead.
/// platforms or when the `rocksdb` feature is not enabled). All method calls on `RocksDBProvider`
/// are cfg-guarded in the calling code, so this stub only provides type definitions.
#[derive(Debug, Clone)]
pub struct RocksDBProvider;
impl RocksDBProvider {
/// Creates a new stub `RocksDB` provider.
///
/// On non-Unix platforms, this returns an error indicating `RocksDB` is not supported.
pub fn new(_path: impl AsRef<Path>) -> ProviderResult<Self> {
Ok(Self)
}
@@ -33,130 +47,22 @@ impl RocksDBProvider {
RocksDBBuilder::new(path)
}
/// Get a value from `RocksDB` (stub implementation).
pub fn get<T: Table>(&self, _key: T::Key) -> ProviderResult<Option<T::Value>> {
Err(UnsupportedProvider)
}
/// Get a value from `RocksDB` using pre-encoded key (stub implementation).
pub const fn get_encoded<T: Table>(
&self,
_key: &<T::Key as Encode>::Encoded,
) -> ProviderResult<Option<T::Value>> {
Err(UnsupportedProvider)
}
/// Put a value into `RocksDB` (stub implementation).
pub fn put<T: Table>(&self, _key: T::Key, _value: &T::Value) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Put a value into `RocksDB` using pre-encoded key (stub implementation).
pub const fn put_encoded<T: Table>(
&self,
_key: &<T::Key as Encode>::Encoded,
_value: &T::Value,
) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Delete a value from `RocksDB` (stub implementation).
pub fn delete<T: Table>(&self, _key: T::Key) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Write a batch of operations (stub implementation).
pub fn write_batch<F>(&self, _f: F) -> ProviderResult<()>
where
F: FnOnce(&mut RocksDBBatch) -> ProviderResult<()>,
{
Err(UnsupportedProvider)
}
/// Creates a new transaction (stub implementation).
pub const fn tx(&self) -> RocksTx {
RocksTx
}
/// Creates a new batch for atomic writes (stub implementation).
pub const fn batch(&self) -> RocksDBBatch {
RocksDBBatch
}
/// Gets the first key-value pair from a table (stub implementation).
pub const fn first<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
Ok(None)
}
/// Gets the last key-value pair from a table (stub implementation).
pub const fn last<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
Ok(None)
}
/// Creates an iterator for the specified table (stub implementation).
///
/// Returns an empty iterator. This is consistent with `first()` and `last()` returning
/// `Ok(None)` - the stub behaves as if the database is empty rather than unavailable.
pub const fn iter<T: Table>(&self) -> ProviderResult<RocksDBIter<'_, T>> {
Ok(RocksDBIter { _marker: std::marker::PhantomData })
}
/// Check consistency of `RocksDB` tables (stub implementation).
///
/// Returns `None` since there is no `RocksDB` data to check when the feature is disabled.
pub const fn check_consistency<Provider>(
&self,
_provider: &Provider,
) -> ProviderResult<Option<alloy_primitives::BlockNumber>> {
) -> ProviderResult<Option<BlockNumber>> {
Ok(None)
}
}
/// A stub batch writer for `RocksDB` on non-Unix platforms.
/// A stub batch writer for `RocksDB`.
#[derive(Debug)]
pub struct RocksDBBatch;
impl RocksDBBatch {
/// Puts a value into the batch (stub implementation).
pub fn put<T: Table>(&self, _key: T::Key, _value: &T::Value) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Puts a value into the batch using pre-encoded key (stub implementation).
pub const fn put_encoded<T: Table>(
&self,
_key: &<T::Key as Encode>::Encoded,
_value: &T::Value,
) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Deletes a value from the batch (stub implementation).
pub fn delete<T: Table>(&self, _key: T::Key) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Commits the batch (stub implementation).
pub const fn commit(self) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
}
/// A stub iterator for `RocksDB` (non-transactional).
#[derive(Debug)]
pub struct RocksDBIter<'a, T> {
_marker: std::marker::PhantomData<(&'a (), T)>,
}
impl<T: Table> Iterator for RocksDBIter<'_, T> {
type Item = ProviderResult<(T::Key, T::Value)>;
fn next(&mut self) -> Option<Self::Item> {
None
}
}
/// A stub builder for `RocksDB` on non-Unix platforms.
/// A stub builder for `RocksDB`.
#[derive(Debug)]
pub struct RocksDBBuilder;
@@ -167,7 +73,7 @@ impl RocksDBBuilder {
}
/// Adds a column family for a specific table type (stub implementation).
pub const fn with_table<T: Table>(self) -> Self {
pub const fn with_table<T>(self) -> Self {
self
}
@@ -205,71 +111,3 @@ impl RocksDBBuilder {
/// A stub transaction for `RocksDB`.
#[derive(Debug)]
pub struct RocksTx;
impl RocksTx {
/// Gets a value from the specified table (stub implementation).
pub fn get<T: Table>(&self, _key: T::Key) -> ProviderResult<Option<T::Value>> {
Err(UnsupportedProvider)
}
/// Gets a value using pre-encoded key (stub implementation).
pub const fn get_encoded<T: Table>(
&self,
_key: &<T::Key as Encode>::Encoded,
) -> ProviderResult<Option<T::Value>> {
Err(UnsupportedProvider)
}
/// Puts a value into the specified table (stub implementation).
pub fn put<T: Table>(&self, _key: T::Key, _value: &T::Value) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Puts a value using pre-encoded key (stub implementation).
pub const fn put_encoded<T: Table>(
&self,
_key: &<T::Key as Encode>::Encoded,
_value: &T::Value,
) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Deletes a value from the specified table (stub implementation).
pub fn delete<T: Table>(&self, _key: T::Key) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Creates an iterator for the specified table (stub implementation).
pub const fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
Err(UnsupportedProvider)
}
/// Creates an iterator starting from the given key (stub implementation).
pub fn iter_from<T: Table>(&self, _key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
Err(UnsupportedProvider)
}
/// Commits the transaction (stub implementation).
pub const fn commit(self) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
/// Rolls back the transaction (stub implementation).
pub const fn rollback(self) -> ProviderResult<()> {
Err(UnsupportedProvider)
}
}
/// A stub iterator for `RocksDB` transactions.
#[derive(Debug)]
pub struct RocksTxIter<'a, T> {
_marker: std::marker::PhantomData<(&'a (), T)>,
}
impl<T: Table> Iterator for RocksTxIter<'_, T> {
type Item = ProviderResult<(T::Key, T::Value)>;
fn next(&mut self) -> Option<Self::Item> {
None
}
}

View File

@@ -141,3 +141,7 @@ harness = false
name = "hashed_state"
harness = false
required-features = ["rayon"]
[[bench]]
name = "merge_strategies"
harness = false

View File

@@ -0,0 +1,131 @@
#![allow(missing_docs, unreachable_pub)]
use alloy_primitives::{B256, U256};
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use reth_primitives_traits::Account;
use reth_trie_common::{HashedPostState, HashedPostStateSorted, HashedStorage, HashedStorageSorted};
use std::collections::HashMap;
fn keccak256_mock(n: u64) -> B256 {
let mut bytes = [0u8; 32];
bytes[24..].copy_from_slice(&n.to_be_bytes());
for (i, item) in bytes.iter_mut().enumerate() {
*item ^= ((n.wrapping_mul(0x9e3779b97f4a7c15) >> (i * 8)) & 0xff) as u8;
}
B256::from(bytes)
}
fn generate_hashed_post_state_sorted(
base_offset: u64,
num_accounts: usize,
storage_slots_per_account: usize,
) -> HashedPostStateSorted {
let mut state = HashedPostState::default();
for i in 0..num_accounts {
let hashed_address = keccak256_mock(base_offset + i as u64);
let account = Some(Account {
nonce: i as u64,
balance: U256::from(i * 1000),
bytecode_hash: None,
});
state.accounts.insert(hashed_address, account);
if storage_slots_per_account > 0 {
let mut storage = HashedStorage::new(false);
for j in 0..storage_slots_per_account {
let slot = keccak256_mock(base_offset * 1000 + (i * 100 + j) as u64);
storage.storage.insert(slot, U256::from(j));
}
state.storages.insert(hashed_address, storage);
}
}
state.into_sorted()
}
fn generate_states(
num_sources: usize,
items_per_source: usize,
) -> Vec<HashedPostStateSorted> {
(0..num_sources)
.map(|i| {
generate_hashed_post_state_sorted(
(i * 1_000_000) as u64,
items_per_source,
5,
)
})
.collect()
}
fn extend_ref_chain(states: &[HashedPostStateSorted]) -> HashedPostStateSorted {
if states.is_empty() {
return HashedPostStateSorted::default();
}
let mut result = states[0].clone();
for state in &states[1..] {
result.extend_ref(state);
}
result
}
fn bench_merge_strategies(c: &mut Criterion) {
let mut group = c.benchmark_group("HashedPostStateSorted_merge");
// Test various source counts and items per source
// We want to find where extend_ref beats kway_merge
let configs = [
// (num_sources, items_per_source)
(5, 100),
(5, 500),
(5, 1000),
(5, 2000),
(5, 5000),
(5, 10000),
(10, 100),
(10, 500),
(10, 1000),
(10, 2000),
(10, 5000),
(20, 100),
(20, 500),
(20, 1000),
(20, 2000),
];
for (num_sources, items_per_source) in configs {
let states = generate_states(num_sources, items_per_source);
let total_items = num_sources * items_per_source;
let label = format!("{}src_{}items", num_sources, items_per_source);
group.throughput(Throughput::Elements(total_items as u64));
group.bench_with_input(
BenchmarkId::new("kway_merge", &label),
&states,
|b, states| {
b.iter(|| {
let result = HashedPostStateSorted::merge_batch(black_box(states.iter()));
black_box(result)
});
},
);
group.bench_with_input(
BenchmarkId::new("extend_ref_new", &label),
&states,
|b, states| {
b.iter(|| {
let result = extend_ref_chain(black_box(states));
black_box(result)
});
},
);
}
group.finish();
}
criterion_group!(benches, bench_merge_strategies);
criterion_main!(benches);

View File

@@ -636,30 +636,46 @@ impl HashedPostStateSorted {
/// Batch-merge sorted hashed post states. Iterator yields **newest to oldest**.
///
/// Uses k-way merge for O(n log k) complexity and one-pass accumulation for storages.
/// Uses adaptive merge strategy:
/// - k >= 30 sources: k-way merge (avoids O(k) copying amplification)
/// - Large avg items/source (>= 2000): pairwise extend_ref (better cache locality)
/// - Otherwise: HashMap merge then sort (lower per-element overhead for small data)
pub fn merge_batch<'a>(states: impl IntoIterator<Item = &'a Self>) -> Self {
let states: Vec<_> = states.into_iter().collect();
if states.is_empty() {
return Self::default();
}
match states.len() {
0 => return Self::default(),
1 => return states[0].clone(),
n => {
let total_items: usize = states.iter().map(|s| s.total_len()).sum();
if n >= crate::utils::KWAY_MIN_SOURCES {
Self::merge_batch_kway(&states)
} else if crate::utils::prefer_sorted_merge(n, total_items) {
let mut result = states[0].clone();
for state in &states[1..] {
result.extend_ref(state);
}
result
} else {
Self::merge_batch_hashmap(&states)
}
}
}
}
/// K-way merge implementation for many sources.
fn merge_batch_kway<'a>(states: &[&'a Self]) -> Self {
let accounts = kway_merge_sorted(states.iter().map(|s| s.accounts.as_slice()));
struct StorageAcc<'a> {
/// Account storage was cleared (e.g., SELFDESTRUCT).
wiped: bool,
/// Stop collecting older slices after seeing a wipe.
sealed: bool,
/// Storage slot slices to merge, ordered newest to oldest.
slices: Vec<&'a [(B256, U256)]>,
}
let mut acc: B256Map<StorageAcc<'_>> = B256Map::default();
// Accumulate storage slices per address from newest to oldest state.
// Once we see a `wiped` flag, the account was cleared at that point,
// so older storage slots are irrelevant - we "seal" and stop collecting.
for state in &states {
for state in states {
for (addr, storage) in &state.storages {
let entry = acc.entry(*addr).or_insert_with(|| StorageAcc {
wiped: false,
@@ -690,6 +706,15 @@ impl HashedPostStateSorted {
Self { accounts, storages }
}
/// HashMap-based merge for small data with low per-element overhead.
fn merge_batch_hashmap<'a>(states: &[&'a Self]) -> Self {
let mut unsorted = HashedPostState::default();
for state in states.iter().rev() {
unsorted.extend_from_sorted(state);
}
unsorted.into_sorted()
}
/// Clears all accounts and storage data.
pub fn clear(&mut self) {
self.accounts.clear();
@@ -753,17 +778,49 @@ impl HashedStorageSorted {
/// Batch-merge sorted hashed storage. Iterator yields **newest to oldest**.
/// If any update is wiped, prior data is discarded.
///
/// Uses adaptive merge strategy:
/// - k >= 30 sources: k-way merge (avoids O(k) copying amplification)
/// - Large avg items/source (>= 2000): pairwise extend_ref (better cache locality)
/// - Otherwise: HashMap merge then sort (lower per-element overhead for small data)
pub fn merge_batch<'a>(updates: impl IntoIterator<Item = &'a Self>) -> Self {
let updates: Vec<_> = updates.into_iter().collect();
if updates.is_empty() {
return Self::default();
}
let wipe_idx = updates.iter().position(|u| u.wiped);
let relevant = wipe_idx.map_or(&updates[..], |idx| &updates[..=idx]);
let storage_slots = kway_merge_sorted(relevant.iter().map(|u| u.storage_slots.as_slice()));
Self { wiped: wipe_idx.is_some(), storage_slots }
match relevant.len() {
0 => return Self::default(),
1 => return Self { wiped: wipe_idx.is_some(), ..relevant[0].clone() },
n => {
let total_items: usize = relevant.iter().map(|u| u.len()).sum();
let storage_slots = if n >= crate::utils::KWAY_MIN_SOURCES {
kway_merge_sorted(relevant.iter().map(|u| u.storage_slots.as_slice()))
} else if crate::utils::prefer_sorted_merge(n, total_items) {
let mut result = relevant[0].storage_slots.clone();
for update in &relevant[1..] {
extend_sorted_vec(&mut result, &update.storage_slots);
}
result
} else {
Self::merge_batch_hashmap(relevant)
};
Self { wiped: wipe_idx.is_some(), storage_slots }
}
}
}
fn merge_batch_hashmap(updates: &[&Self]) -> Vec<(B256, U256)> {
let mut map: B256Map<U256> = B256Map::default();
for update in updates.iter().rev() {
for &(slot, value) in &update.storage_slots {
map.insert(slot, value);
}
}
let mut result: Vec<_> = map.into_iter().collect();
result.sort_unstable_by_key(|(k, _)| *k);
result
}
}

View File

@@ -626,33 +626,45 @@ impl TrieUpdatesSorted {
/// Batch-merge sorted trie updates. Iterator yields **newest to oldest**.
///
/// This is more efficient than repeated `extend_ref` calls for large batches,
/// using k-way merge for O(n log k) complexity instead of O(n * k).
/// Uses adaptive merge strategy:
/// - k >= 30 sources: k-way merge (avoids O(k) copying amplification)
/// - Large avg items/source (>= 2000): pairwise extend_ref (better cache locality)
/// - Otherwise: HashMap merge then sort (lower per-element overhead for small data)
pub fn merge_batch<'a>(updates: impl IntoIterator<Item = &'a Self>) -> Self {
let updates: Vec<_> = updates.into_iter().collect();
if updates.is_empty() {
return Self::default();
}
match updates.len() {
0 => return Self::default(),
1 => return updates[0].clone(),
n => {
let total_items: usize = updates.iter().map(|u| u.total_len()).sum();
// Merge account nodes using k-way merge. Newest (index 0) takes precedence.
if n >= crate::utils::KWAY_MIN_SOURCES {
Self::merge_batch_kway(&updates)
} else if crate::utils::prefer_sorted_merge(n, total_items) {
let mut result = updates[0].clone();
for update in &updates[1..] {
result.extend_ref(update);
}
result
} else {
Self::merge_batch_hashmap(&updates)
}
}
}
}
fn merge_batch_kway<'a>(updates: &[&'a Self]) -> Self {
let account_nodes = kway_merge_sorted(updates.iter().map(|u| u.account_nodes.as_slice()));
// Accumulator for collecting storage trie slices per address.
// We process updates newest-to-oldest and stop collecting for an address
// once we hit a "deleted" storage (sealed=true), since older data is irrelevant.
struct StorageAcc<'a> {
/// Storage trie was deleted (account removed or cleared).
is_deleted: bool,
/// Stop collecting older slices after seeing a deletion.
sealed: bool,
/// Storage trie node slices to merge, ordered newest to oldest.
slices: Vec<&'a [(Nibbles, Option<BranchNodeCompact>)]>,
}
let mut acc: B256Map<StorageAcc<'_>> = B256Map::default();
// Collect storage slices per address, respecting deletion boundaries
for update in &updates {
for update in updates {
for (addr, storage) in &update.storage_tries {
let entry = acc.entry(*addr).or_insert_with(|| StorageAcc {
is_deleted: false,
@@ -660,14 +672,12 @@ impl TrieUpdatesSorted {
slices: Vec::new(),
});
// Skip if we already hit a deletion for this address (older data is irrelevant)
if entry.sealed {
continue;
}
entry.slices.push(storage.storage_nodes.as_slice());
// If this storage was deleted, mark as deleted and seal to ignore older updates
if storage.is_deleted {
entry.is_deleted = true;
entry.sealed = true;
@@ -675,7 +685,6 @@ impl TrieUpdatesSorted {
}
}
// Merge each address's storage slices using k-way merge
let storage_tries = acc
.into_iter()
.map(|(addr, entry)| {
@@ -686,6 +695,14 @@ impl TrieUpdatesSorted {
Self { account_nodes, storage_tries }
}
fn merge_batch_hashmap<'a>(updates: &[&'a Self]) -> Self {
let mut unsorted = TrieUpdates::default();
for update in updates.iter().rev() {
unsorted.extend_from_sorted(update);
}
unsorted.into_sorted()
}
}
impl AsRef<Self> for TrieUpdatesSorted {
@@ -780,18 +797,51 @@ impl StorageTrieUpdatesSorted {
/// Batch-merge sorted storage trie updates. Iterator yields **newest to oldest**.
/// If any update is deleted, older data is discarded.
///
/// Uses adaptive merge strategy:
/// - k >= 30 sources: k-way merge (avoids O(k) copying amplification)
/// - Large avg items/source (>= 2000): pairwise extend_ref (better cache locality)
/// - Otherwise: HashMap merge then sort (lower per-element overhead for small data)
pub fn merge_batch<'a>(updates: impl IntoIterator<Item = &'a Self>) -> Self {
let updates: Vec<_> = updates.into_iter().collect();
if updates.is_empty() {
return Self::default();
}
// Discard updates older than the first deletion since the trie was wiped at that point.
let del_idx = updates.iter().position(|u| u.is_deleted);
let relevant = del_idx.map_or(&updates[..], |idx| &updates[..=idx]);
let storage_nodes = kway_merge_sorted(relevant.iter().map(|u| u.storage_nodes.as_slice()));
Self { is_deleted: del_idx.is_some(), storage_nodes }
match relevant.len() {
0 => return Self::default(),
1 => return Self { is_deleted: del_idx.is_some(), ..relevant[0].clone() },
n => {
let total_items: usize = relevant.iter().map(|u| u.len()).sum();
let storage_nodes = if n >= crate::utils::KWAY_MIN_SOURCES {
kway_merge_sorted(relevant.iter().map(|u| u.storage_nodes.as_slice()))
} else if crate::utils::prefer_sorted_merge(n, total_items) {
let mut result = relevant[0].storage_nodes.clone();
for update in &relevant[1..] {
extend_sorted_vec(&mut result, &update.storage_nodes);
}
result
} else {
Self::merge_batch_hashmap(relevant)
};
Self { is_deleted: del_idx.is_some(), storage_nodes }
}
}
}
fn merge_batch_hashmap(
updates: &[&Self],
) -> Vec<(Nibbles, Option<BranchNodeCompact>)> {
let mut map: HashMap<Nibbles, Option<BranchNodeCompact>> = HashMap::default();
for update in updates.iter().rev() {
for (nibbles, node) in &update.storage_nodes {
map.insert(*nibbles, node.clone());
}
}
let mut result: Vec<_> = map.into_iter().collect();
result.sort_unstable_by_key(|(k, _)| *k);
result
}
}

View File

@@ -2,6 +2,22 @@ use alloc::vec::Vec;
use core::cmp::Ordering;
use itertools::Itertools;
/// Minimum average items per source to prefer pairwise sorted merge over HashMap merge.
pub(crate) const PAIRWISE_MIN_AVG_ITEMS: usize = 2000;
/// Minimum number of sources that triggers k-way merge instead of pairwise sorted merge.
pub(crate) const KWAY_MIN_SOURCES: usize = 30;
/// Returns true if pairwise sorted merge is preferred over HashMap merge.
/// Returns false if k >= KWAY_MIN_SOURCES (use kway) or avg items < threshold (use HashMap).
#[inline]
pub(crate) fn prefer_sorted_merge(num_sources: usize, total_items: usize) -> bool {
if num_sources >= KWAY_MIN_SOURCES {
return false;
}
total_items >= PAIRWISE_MIN_AVG_ITEMS.saturating_mul(num_sources)
}
/// Merge sorted slices into a sorted `Vec`. First occurrence wins for duplicate keys.
///
/// Callers pass slices in priority order (index 0 = highest priority), so the first
@@ -26,45 +42,56 @@ where
.collect()
}
/// Extend a sorted vector with another sorted vector.
/// Values from `other` take precedence for duplicate keys.
///
/// Extend a sorted vector with another sorted vector using O(n+m) merge.
/// Values from `other` take precedence for duplicate keys.
pub(crate) fn extend_sorted_vec<K, V>(target: &mut Vec<(K, V)>, other: &[(K, V)])
where
K: Clone + Ord,
V: Clone,
{
let cmp = |a: &(K, V), b: &(K, V)| a.0.cmp(&b.0);
if other.is_empty() {
return;
}
let mut other_iter = other.iter().peekable();
let initial_len = target.len();
for i in 0..initial_len {
while let Some(other_item) = other_iter.peek() {
let target_item = &mut target[i];
match cmp(other_item, target_item) {
Ordering::Less => {
target.push(other_iter.next().unwrap().clone());
}
Ordering::Equal => {
target_item.1 = other_iter.next().unwrap().1.clone();
break;
}
Ordering::Greater => {
break;
}
if target.is_empty() {
target.extend_from_slice(other);
return;
}
// Fast path: non-overlapping ranges - just append
if target.last().map(|(k, _)| k) < other.first().map(|(k, _)| k) {
target.extend_from_slice(other);
return;
}
// Move ownership of target to avoid cloning owned elements
let left = core::mem::take(target);
let mut out = Vec::with_capacity(left.len() + other.len());
let mut a = left.into_iter().peekable();
let mut b = other.iter().peekable();
while let (Some(aa), Some(bb)) = (a.peek(), b.peek()) {
match aa.0.cmp(&bb.0) {
Ordering::Less => {
out.push(a.next().unwrap());
}
Ordering::Greater => {
out.push(b.next().unwrap().clone());
}
Ordering::Equal => {
// `other` takes precedence for duplicate keys - reuse key from `a`
let (k, _) = a.next().unwrap();
out.push((k, b.next().unwrap().1.clone()));
}
}
}
target.extend(other_iter.cloned());
if target.len() > initial_len {
target.sort_by(cmp);
}
// Drain remaining: `a` moves, `b` clones
out.extend(a);
out.extend(b.cloned());
*target = out;
}
#[cfg(test)]
@@ -79,6 +106,55 @@ mod tests {
assert_eq!(target, vec![(1, "a"), (2, "b"), (3, "c_new")]);
}
#[test]
fn test_extend_sorted_vec_empty_target() {
let mut target: Vec<(i32, &str)> = vec![];
let other = vec![(1, "a"), (2, "b")];
extend_sorted_vec(&mut target, &other);
assert_eq!(target, vec![(1, "a"), (2, "b")]);
}
#[test]
fn test_extend_sorted_vec_empty_other() {
let mut target = vec![(1, "a"), (2, "b")];
let other: Vec<(i32, &str)> = vec![];
extend_sorted_vec(&mut target, &other);
assert_eq!(target, vec![(1, "a"), (2, "b")]);
}
#[test]
fn test_extend_sorted_vec_all_duplicates() {
let mut target = vec![(1, "old1"), (2, "old2"), (3, "old3")];
let other = vec![(1, "new1"), (2, "new2"), (3, "new3")];
extend_sorted_vec(&mut target, &other);
// other takes precedence
assert_eq!(target, vec![(1, "new1"), (2, "new2"), (3, "new3")]);
}
#[test]
fn test_extend_sorted_vec_interleaved() {
let mut target = vec![(1, "a"), (3, "c"), (5, "e")];
let other = vec![(2, "b"), (4, "d"), (6, "f")];
extend_sorted_vec(&mut target, &other);
assert_eq!(target, vec![(1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e"), (6, "f")]);
}
#[test]
fn test_extend_sorted_vec_other_all_smaller() {
let mut target = vec![(5, "e"), (6, "f")];
let other = vec![(1, "a"), (2, "b")];
extend_sorted_vec(&mut target, &other);
assert_eq!(target, vec![(1, "a"), (2, "b"), (5, "e"), (6, "f")]);
}
#[test]
fn test_extend_sorted_vec_other_all_larger() {
let mut target = vec![(1, "a"), (2, "b")];
let other = vec![(5, "e"), (6, "f")];
extend_sorted_vec(&mut target, &other);
assert_eq!(target, vec![(1, "a"), (2, "b"), (5, "e"), (6, "f")]);
}
#[test]
fn test_kway_merge_sorted_basic() {
let slice1 = vec![(1, "a1"), (3, "c1")];
@@ -123,4 +199,24 @@ mod tests {
let result: Vec<(i32, &str)> = kway_merge_sorted(Vec::<&[(i32, &str)]>::new());
assert!(result.is_empty());
}
#[test]
fn test_prefer_sorted_merge_kway_threshold() {
assert!(!prefer_sorted_merge(30, 100_000));
assert!(!prefer_sorted_merge(50, 200_000));
assert!(prefer_sorted_merge(29, 29 * PAIRWISE_MIN_AVG_ITEMS));
}
#[test]
fn test_prefer_sorted_merge_pairwise_threshold() {
assert!(prefer_sorted_merge(5, 5 * PAIRWISE_MIN_AVG_ITEMS));
assert!(prefer_sorted_merge(10, 10 * PAIRWISE_MIN_AVG_ITEMS + 1));
assert!(!prefer_sorted_merge(5, 5 * PAIRWISE_MIN_AVG_ITEMS - 1));
}
#[test]
fn test_prefer_sorted_merge_small_data() {
assert!(!prefer_sorted_merge(2, 100));
assert!(!prefer_sorted_merge(5, 1000));
}
}