mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a8be40c029 | ||
|
|
9c07dca43b | ||
|
|
b9e15dbd30 | ||
|
|
8c07ee2be4 |
@@ -2,9 +2,8 @@
|
||||
|
||||
/// Threshold for switching from `extend_ref` loop to `merge_batch` in `merge_overlay_trie_input`.
|
||||
///
|
||||
/// Benchmarked crossover: `extend_ref` wins up to ~64 blocks, `merge_batch` wins beyond.
|
||||
/// Using 64 as threshold since they're roughly equal there.
|
||||
const MERGE_BATCH_THRESHOLD: usize = 64;
|
||||
/// Benchmarked crossover: `extend_ref` wins up to ~30 blocks, `merge_batch` wins beyond.
|
||||
const MERGE_BATCH_THRESHOLD: usize = 30;
|
||||
|
||||
use crate::tree::{
|
||||
cached_state::CachedStateProvider,
|
||||
|
||||
@@ -101,4 +101,11 @@ impl StorageSettings {
|
||||
self.account_changesets_in_static_files = value;
|
||||
self
|
||||
}
|
||||
|
||||
/// Returns `true` if any tables are configured to be stored in `RocksDB`.
|
||||
pub const fn any_in_rocksdb(&self) -> bool {
|
||||
self.transaction_hash_numbers_in_rocksdb ||
|
||||
self.account_history_in_rocksdb ||
|
||||
self.storages_history_in_rocksdb
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::{
|
||||
},
|
||||
providers::{
|
||||
database::{chain::ChainStorage, metrics},
|
||||
rocksdb::RocksDBProvider,
|
||||
rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
|
||||
static_file::{StaticFileWriteCtx, StaticFileWriter},
|
||||
NodeTypesForProvider, StaticFileProvider,
|
||||
},
|
||||
@@ -188,8 +188,8 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
|
||||
/// `RocksDB` provider
|
||||
rocksdb_provider: RocksDBProvider,
|
||||
/// Pending `RocksDB` batches to be committed at provider commit time.
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
pending_rocksdb_batches: parking_lot::Mutex<Vec<rocksdb::WriteBatchWithTransaction<true>>>,
|
||||
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
|
||||
pending_rocksdb_batches: PendingRocksDBBatches,
|
||||
/// Minimum distance from tip required for pruning
|
||||
minimum_pruning_distance: u64,
|
||||
/// Database provider metrics
|
||||
@@ -205,10 +205,10 @@ impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
|
||||
.field("prune_modes", &self.prune_modes)
|
||||
.field("storage", &self.storage)
|
||||
.field("storage_settings", &self.storage_settings)
|
||||
.field("rocksdb_provider", &self.rocksdb_provider);
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
s.field("pending_rocksdb_batches", &"<pending batches>");
|
||||
s.field("minimum_pruning_distance", &self.minimum_pruning_distance).finish()
|
||||
.field("rocksdb_provider", &self.rocksdb_provider)
|
||||
.field("pending_rocksdb_batches", &"<pending batches>")
|
||||
.field("minimum_pruning_distance", &self.minimum_pruning_distance)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -336,8 +336,7 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
storage,
|
||||
storage_settings,
|
||||
rocksdb_provider,
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
|
||||
pending_rocksdb_batches: Default::default(),
|
||||
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
|
||||
metrics: metrics::DatabaseProviderMetrics::default(),
|
||||
}
|
||||
@@ -403,6 +402,17 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
})
|
||||
}
|
||||
|
||||
/// Creates the context for `RocksDB` writes.
|
||||
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
|
||||
fn rocksdb_write_ctx(&self, first_block: BlockNumber) -> RocksDBWriteCtx {
|
||||
RocksDBWriteCtx {
|
||||
first_block_number: first_block,
|
||||
prune_tx_lookup: self.prune_modes.transaction_lookup,
|
||||
storage_settings: self.cached_storage_settings(),
|
||||
pending_batches: self.pending_rocksdb_batches.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Writes executed blocks and state to storage.
|
||||
///
|
||||
/// This method parallelizes static file (SF) writes with MDBX writes.
|
||||
@@ -452,6 +462,10 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
// avoid capturing &self.tx in scope below.
|
||||
let sf_provider = &self.static_file_provider;
|
||||
let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb_provider = self.rocksdb_provider.clone();
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb_ctx = self.rocksdb_write_ctx(first_number);
|
||||
|
||||
thread::scope(|s| {
|
||||
// SF writes
|
||||
@@ -461,6 +475,16 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
Ok::<_, ProviderError>(start.elapsed())
|
||||
});
|
||||
|
||||
// RocksDB writes
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb_handle = rocksdb_ctx.storage_settings.any_in_rocksdb().then(|| {
|
||||
s.spawn(|| {
|
||||
let start = Instant::now();
|
||||
rocksdb_provider.write_blocks_data(&blocks, &tx_nums, rocksdb_ctx)?;
|
||||
Ok::<_, ProviderError>(start.elapsed())
|
||||
})
|
||||
});
|
||||
|
||||
// MDBX writes
|
||||
let mdbx_start = Instant::now();
|
||||
|
||||
@@ -557,6 +581,12 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
.join()
|
||||
.map_err(|_| StaticFileWriterError::ThreadPanic("static file"))??;
|
||||
|
||||
// Wait for RocksDB thread
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if let Some(handle) = rocksdb_handle {
|
||||
timings.rocksdb = handle.join().expect("RocksDB thread panicked")?;
|
||||
}
|
||||
|
||||
timings.total = total_start.elapsed();
|
||||
|
||||
self.metrics.record_save_blocks(&timings);
|
||||
@@ -821,8 +851,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
|
||||
storage,
|
||||
storage_settings,
|
||||
rocksdb_provider,
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
|
||||
pending_rocksdb_batches: Default::default(),
|
||||
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
|
||||
metrics: metrics::DatabaseProviderMetrics::default(),
|
||||
}
|
||||
@@ -3167,14 +3196,13 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvi
|
||||
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
|
||||
// account history stage
|
||||
{
|
||||
let storage_settings = self.cached_storage_settings();
|
||||
if !storage_settings.account_history_in_rocksdb {
|
||||
let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
|
||||
self.insert_account_history_index(indices)?;
|
||||
}
|
||||
|
||||
// storage history stage
|
||||
{
|
||||
if !storage_settings.storages_history_in_rocksdb {
|
||||
let indices = self.changed_storages_and_blocks_with_range(range)?;
|
||||
self.insert_storage_history_index(indices)?;
|
||||
}
|
||||
|
||||
@@ -4,4 +4,5 @@ mod invariants;
|
||||
mod metrics;
|
||||
mod provider;
|
||||
|
||||
pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx};
|
||||
pub use provider::{RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksTx};
|
||||
|
||||
@@ -1,11 +1,16 @@
|
||||
use super::metrics::{RocksDBMetrics, RocksDBOperation};
|
||||
use crate::providers::{needs_prev_shard_check, HistoryInfo};
|
||||
use alloy_primitives::{Address, BlockNumber, B256};
|
||||
use alloy_consensus::transaction::TxHashRef;
|
||||
use alloy_primitives::{Address, BlockNumber, TxNumber, B256};
|
||||
use parking_lot::Mutex;
|
||||
use reth_chain_state::ExecutedBlock;
|
||||
use reth_db_api::{
|
||||
models::{storage_sharded_key::StorageShardedKey, ShardedKey},
|
||||
models::{storage_sharded_key::StorageShardedKey, ShardedKey, StorageSettings},
|
||||
table::{Compress, Decode, Decompress, Encode, Table},
|
||||
tables, BlockNumberList, DatabaseError,
|
||||
};
|
||||
use reth_primitives_traits::BlockBody as _;
|
||||
use reth_prune_types::PruneMode;
|
||||
use reth_storage_errors::{
|
||||
db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
|
||||
provider::{ProviderError, ProviderResult},
|
||||
@@ -16,11 +21,41 @@ use rocksdb::{
|
||||
OptimisticTransactionOptions, Options, Transaction, WriteBatchWithTransaction, WriteOptions,
|
||||
};
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
fmt,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
thread,
|
||||
time::Instant,
|
||||
};
|
||||
use tracing::instrument;
|
||||
|
||||
/// Pending `RocksDB` batches type alias.
|
||||
pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<WriteBatchWithTransaction<true>>>>;
|
||||
|
||||
/// Context for `RocksDB` block writes.
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct RocksDBWriteCtx {
|
||||
/// The first block number being written.
|
||||
pub first_block_number: BlockNumber,
|
||||
/// The prune mode for transaction lookup, if any.
|
||||
pub prune_tx_lookup: Option<PruneMode>,
|
||||
/// Storage settings determining what goes to `RocksDB`.
|
||||
pub storage_settings: StorageSettings,
|
||||
/// Pending batches to push to after writing.
|
||||
pub pending_batches: PendingRocksDBBatches,
|
||||
}
|
||||
|
||||
impl fmt::Debug for RocksDBWriteCtx {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("RocksDBWriteCtx")
|
||||
.field("first_block_number", &self.first_block_number)
|
||||
.field("prune_tx_lookup", &self.prune_tx_lookup)
|
||||
.field("storage_settings", &self.storage_settings)
|
||||
.field("pending_batches", &"<pending batches>")
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Default cache size for `RocksDB` block cache (128 MB).
|
||||
const DEFAULT_CACHE_SIZE: usize = 128 << 20;
|
||||
@@ -474,6 +509,125 @@ impl RocksDBProvider {
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
/// Writes all `RocksDB` data for multiple blocks in parallel.
|
||||
///
|
||||
/// This handles transaction hash numbers, account history, and storage history based on
|
||||
/// the provided storage settings. Each operation runs in parallel with its own batch,
|
||||
/// pushing to `ctx.pending_batches` for later commit.
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
pub(crate) fn write_blocks_data<N: reth_node_types::NodePrimitives>(
|
||||
&self,
|
||||
blocks: &[ExecutedBlock<N>],
|
||||
tx_nums: &[TxNumber],
|
||||
ctx: RocksDBWriteCtx,
|
||||
) -> ProviderResult<()> {
|
||||
if !ctx.storage_settings.any_in_rocksdb() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
thread::scope(|s| {
|
||||
let handles: Vec<_> = [
|
||||
(ctx.storage_settings.transaction_hash_numbers_in_rocksdb &&
|
||||
ctx.prune_tx_lookup.is_none_or(|m| !m.is_full()))
|
||||
.then(|| s.spawn(|| self.write_tx_hash_numbers(blocks, tx_nums, &ctx))),
|
||||
ctx.storage_settings
|
||||
.account_history_in_rocksdb
|
||||
.then(|| s.spawn(|| self.write_account_history(blocks, &ctx))),
|
||||
ctx.storage_settings
|
||||
.storages_history_in_rocksdb
|
||||
.then(|| s.spawn(|| self.write_storage_history(blocks, &ctx))),
|
||||
]
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.filter_map(|(i, h)| h.map(|h| (i, h)))
|
||||
.collect();
|
||||
|
||||
for (i, handle) in handles {
|
||||
handle.join().map_err(|_| {
|
||||
ProviderError::Database(DatabaseError::Other(format!(
|
||||
"rocksdb write thread {i} panicked"
|
||||
)))
|
||||
})??;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
/// Writes transaction hash to number mappings for the given blocks.
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
fn write_tx_hash_numbers<N: reth_node_types::NodePrimitives>(
|
||||
&self,
|
||||
blocks: &[ExecutedBlock<N>],
|
||||
tx_nums: &[TxNumber],
|
||||
ctx: &RocksDBWriteCtx,
|
||||
) -> ProviderResult<()> {
|
||||
let mut batch = self.batch();
|
||||
for (block, &first_tx_num) in blocks.iter().zip(tx_nums) {
|
||||
let body = block.recovered_block().body();
|
||||
let mut tx_num = first_tx_num;
|
||||
for transaction in body.transactions_iter() {
|
||||
batch.put::<tables::TransactionHashNumbers>(*transaction.tx_hash(), &tx_num)?;
|
||||
tx_num += 1;
|
||||
}
|
||||
}
|
||||
ctx.pending_batches.lock().push(batch.into_inner());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes account history indices for the given blocks.
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
fn write_account_history<N: reth_node_types::NodePrimitives>(
|
||||
&self,
|
||||
blocks: &[ExecutedBlock<N>],
|
||||
ctx: &RocksDBWriteCtx,
|
||||
) -> ProviderResult<()> {
|
||||
let mut batch = self.batch();
|
||||
let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
|
||||
for (block_idx, block) in blocks.iter().enumerate() {
|
||||
let block_number = ctx.first_block_number + block_idx as u64;
|
||||
let bundle = &block.execution_outcome().bundle;
|
||||
for &address in bundle.state().keys() {
|
||||
account_history.entry(address).or_default().push(block_number);
|
||||
}
|
||||
}
|
||||
for (address, blocks) in account_history {
|
||||
let key = ShardedKey::new(address, u64::MAX);
|
||||
let value = BlockNumberList::new_pre_sorted(blocks);
|
||||
batch.put::<tables::AccountsHistory>(key, &value)?;
|
||||
}
|
||||
ctx.pending_batches.lock().push(batch.into_inner());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes storage history indices for the given blocks.
|
||||
#[instrument(level = "debug", target = "providers::db", skip_all)]
|
||||
fn write_storage_history<N: reth_node_types::NodePrimitives>(
|
||||
&self,
|
||||
blocks: &[ExecutedBlock<N>],
|
||||
ctx: &RocksDBWriteCtx,
|
||||
) -> ProviderResult<()> {
|
||||
let mut batch = self.batch();
|
||||
let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
|
||||
for (block_idx, block) in blocks.iter().enumerate() {
|
||||
let block_number = ctx.first_block_number + block_idx as u64;
|
||||
let bundle = &block.execution_outcome().bundle;
|
||||
for (&address, account) in bundle.state() {
|
||||
for &slot in account.storage.keys() {
|
||||
let key = B256::new(slot.to_be_bytes());
|
||||
storage_history.entry((address, key)).or_default().push(block_number);
|
||||
}
|
||||
}
|
||||
}
|
||||
for ((address, slot), blocks) in storage_history {
|
||||
let key = StorageShardedKey::new(address, slot, u64::MAX);
|
||||
let value = BlockNumberList::new_pre_sorted(blocks);
|
||||
batch.put::<tables::StoragesHistory>(key, &value)?;
|
||||
}
|
||||
ctx.pending_batches.lock().push(batch.into_inner());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle for building a batch of operations atomically.
|
||||
|
||||
@@ -2,28 +2,42 @@
|
||||
//!
|
||||
//! This module provides placeholder types that allow the code to compile when `RocksDB` is not
|
||||
//! available (either on non-Unix platforms or when the `rocksdb` feature is not enabled).
|
||||
//! Operations will produce errors if actually attempted.
|
||||
//! All method calls are cfg-guarded in the calling code, so only type definitions are needed here.
|
||||
|
||||
use reth_db_api::table::{Encode, Table};
|
||||
use reth_storage_errors::{
|
||||
db::LogLevel,
|
||||
provider::{ProviderError::UnsupportedProvider, ProviderResult},
|
||||
};
|
||||
use std::path::Path;
|
||||
use alloy_primitives::BlockNumber;
|
||||
use parking_lot::Mutex;
|
||||
use reth_db_api::models::StorageSettings;
|
||||
use reth_prune_types::PruneMode;
|
||||
use reth_storage_errors::{db::LogLevel, provider::ProviderResult};
|
||||
use std::{path::Path, sync::Arc};
|
||||
|
||||
/// Pending `RocksDB` batches type alias (stub - uses unit type).
|
||||
pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<()>>>;
|
||||
|
||||
/// Context for `RocksDB` block writes (stub).
|
||||
#[derive(Debug, Clone)]
|
||||
#[allow(dead_code)]
|
||||
pub(crate) struct RocksDBWriteCtx {
|
||||
/// The first block number being written.
|
||||
pub first_block_number: BlockNumber,
|
||||
/// The prune mode for transaction lookup, if any.
|
||||
pub prune_tx_lookup: Option<PruneMode>,
|
||||
/// Storage settings determining what goes to `RocksDB`.
|
||||
pub storage_settings: StorageSettings,
|
||||
/// Pending batches (stub - unused).
|
||||
pub pending_batches: PendingRocksDBBatches,
|
||||
}
|
||||
|
||||
/// A stub `RocksDB` provider.
|
||||
///
|
||||
/// This type exists to allow code to compile when `RocksDB` is not available (either on non-Unix
|
||||
/// platforms or when the `rocksdb` feature is not enabled). When using this stub, the
|
||||
/// `transaction_hash_numbers_in_rocksdb` flag should be set to `false` to ensure all operations
|
||||
/// route to MDBX instead.
|
||||
/// platforms or when the `rocksdb` feature is not enabled). All method calls on `RocksDBProvider`
|
||||
/// are cfg-guarded in the calling code, so this stub only provides type definitions.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RocksDBProvider;
|
||||
|
||||
impl RocksDBProvider {
|
||||
/// Creates a new stub `RocksDB` provider.
|
||||
///
|
||||
/// On non-Unix platforms, this returns an error indicating `RocksDB` is not supported.
|
||||
pub fn new(_path: impl AsRef<Path>) -> ProviderResult<Self> {
|
||||
Ok(Self)
|
||||
}
|
||||
@@ -33,130 +47,22 @@ impl RocksDBProvider {
|
||||
RocksDBBuilder::new(path)
|
||||
}
|
||||
|
||||
/// Get a value from `RocksDB` (stub implementation).
|
||||
pub fn get<T: Table>(&self, _key: T::Key) -> ProviderResult<Option<T::Value>> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Get a value from `RocksDB` using pre-encoded key (stub implementation).
|
||||
pub const fn get_encoded<T: Table>(
|
||||
&self,
|
||||
_key: &<T::Key as Encode>::Encoded,
|
||||
) -> ProviderResult<Option<T::Value>> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Put a value into `RocksDB` (stub implementation).
|
||||
pub fn put<T: Table>(&self, _key: T::Key, _value: &T::Value) -> ProviderResult<()> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Put a value into `RocksDB` using pre-encoded key (stub implementation).
|
||||
pub const fn put_encoded<T: Table>(
|
||||
&self,
|
||||
_key: &<T::Key as Encode>::Encoded,
|
||||
_value: &T::Value,
|
||||
) -> ProviderResult<()> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Delete a value from `RocksDB` (stub implementation).
|
||||
pub fn delete<T: Table>(&self, _key: T::Key) -> ProviderResult<()> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Write a batch of operations (stub implementation).
|
||||
pub fn write_batch<F>(&self, _f: F) -> ProviderResult<()>
|
||||
where
|
||||
F: FnOnce(&mut RocksDBBatch) -> ProviderResult<()>,
|
||||
{
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Creates a new transaction (stub implementation).
|
||||
pub const fn tx(&self) -> RocksTx {
|
||||
RocksTx
|
||||
}
|
||||
|
||||
/// Creates a new batch for atomic writes (stub implementation).
|
||||
pub const fn batch(&self) -> RocksDBBatch {
|
||||
RocksDBBatch
|
||||
}
|
||||
|
||||
/// Gets the first key-value pair from a table (stub implementation).
|
||||
pub const fn first<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Gets the last key-value pair from a table (stub implementation).
|
||||
pub const fn last<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Creates an iterator for the specified table (stub implementation).
|
||||
///
|
||||
/// Returns an empty iterator. This is consistent with `first()` and `last()` returning
|
||||
/// `Ok(None)` - the stub behaves as if the database is empty rather than unavailable.
|
||||
pub const fn iter<T: Table>(&self) -> ProviderResult<RocksDBIter<'_, T>> {
|
||||
Ok(RocksDBIter { _marker: std::marker::PhantomData })
|
||||
}
|
||||
|
||||
/// Check consistency of `RocksDB` tables (stub implementation).
|
||||
///
|
||||
/// Returns `None` since there is no `RocksDB` data to check when the feature is disabled.
|
||||
pub const fn check_consistency<Provider>(
|
||||
&self,
|
||||
_provider: &Provider,
|
||||
) -> ProviderResult<Option<alloy_primitives::BlockNumber>> {
|
||||
) -> ProviderResult<Option<BlockNumber>> {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// A stub batch writer for `RocksDB` on non-Unix platforms.
|
||||
/// A stub batch writer for `RocksDB`.
|
||||
#[derive(Debug)]
|
||||
pub struct RocksDBBatch;
|
||||
|
||||
impl RocksDBBatch {
|
||||
/// Puts a value into the batch (stub implementation).
|
||||
pub fn put<T: Table>(&self, _key: T::Key, _value: &T::Value) -> ProviderResult<()> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Puts a value into the batch using pre-encoded key (stub implementation).
|
||||
pub const fn put_encoded<T: Table>(
|
||||
&self,
|
||||
_key: &<T::Key as Encode>::Encoded,
|
||||
_value: &T::Value,
|
||||
) -> ProviderResult<()> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Deletes a value from the batch (stub implementation).
|
||||
pub fn delete<T: Table>(&self, _key: T::Key) -> ProviderResult<()> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Commits the batch (stub implementation).
|
||||
pub const fn commit(self) -> ProviderResult<()> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
}
|
||||
|
||||
/// A stub iterator for `RocksDB` (non-transactional).
|
||||
#[derive(Debug)]
|
||||
pub struct RocksDBIter<'a, T> {
|
||||
_marker: std::marker::PhantomData<(&'a (), T)>,
|
||||
}
|
||||
|
||||
impl<T: Table> Iterator for RocksDBIter<'_, T> {
|
||||
type Item = ProviderResult<(T::Key, T::Value)>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// A stub builder for `RocksDB` on non-Unix platforms.
|
||||
/// A stub builder for `RocksDB`.
|
||||
#[derive(Debug)]
|
||||
pub struct RocksDBBuilder;
|
||||
|
||||
@@ -167,7 +73,7 @@ impl RocksDBBuilder {
|
||||
}
|
||||
|
||||
/// Adds a column family for a specific table type (stub implementation).
|
||||
pub const fn with_table<T: Table>(self) -> Self {
|
||||
pub const fn with_table<T>(self) -> Self {
|
||||
self
|
||||
}
|
||||
|
||||
@@ -205,71 +111,3 @@ impl RocksDBBuilder {
|
||||
/// A stub transaction for `RocksDB`.
|
||||
#[derive(Debug)]
|
||||
pub struct RocksTx;
|
||||
|
||||
impl RocksTx {
|
||||
/// Gets a value from the specified table (stub implementation).
|
||||
pub fn get<T: Table>(&self, _key: T::Key) -> ProviderResult<Option<T::Value>> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Gets a value using pre-encoded key (stub implementation).
|
||||
pub const fn get_encoded<T: Table>(
|
||||
&self,
|
||||
_key: &<T::Key as Encode>::Encoded,
|
||||
) -> ProviderResult<Option<T::Value>> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Puts a value into the specified table (stub implementation).
|
||||
pub fn put<T: Table>(&self, _key: T::Key, _value: &T::Value) -> ProviderResult<()> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Puts a value using pre-encoded key (stub implementation).
|
||||
pub const fn put_encoded<T: Table>(
|
||||
&self,
|
||||
_key: &<T::Key as Encode>::Encoded,
|
||||
_value: &T::Value,
|
||||
) -> ProviderResult<()> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Deletes a value from the specified table (stub implementation).
|
||||
pub fn delete<T: Table>(&self, _key: T::Key) -> ProviderResult<()> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Creates an iterator for the specified table (stub implementation).
|
||||
pub const fn iter<T: Table>(&self) -> ProviderResult<RocksTxIter<'_, T>> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Creates an iterator starting from the given key (stub implementation).
|
||||
pub fn iter_from<T: Table>(&self, _key: T::Key) -> ProviderResult<RocksTxIter<'_, T>> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Commits the transaction (stub implementation).
|
||||
pub const fn commit(self) -> ProviderResult<()> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
|
||||
/// Rolls back the transaction (stub implementation).
|
||||
pub const fn rollback(self) -> ProviderResult<()> {
|
||||
Err(UnsupportedProvider)
|
||||
}
|
||||
}
|
||||
|
||||
/// A stub iterator for `RocksDB` transactions.
|
||||
#[derive(Debug)]
|
||||
pub struct RocksTxIter<'a, T> {
|
||||
_marker: std::marker::PhantomData<(&'a (), T)>,
|
||||
}
|
||||
|
||||
impl<T: Table> Iterator for RocksTxIter<'_, T> {
|
||||
type Item = ProviderResult<(T::Key, T::Value)>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
@@ -141,3 +141,7 @@ harness = false
|
||||
name = "hashed_state"
|
||||
harness = false
|
||||
required-features = ["rayon"]
|
||||
|
||||
[[bench]]
|
||||
name = "merge_strategies"
|
||||
harness = false
|
||||
|
||||
131
crates/trie/common/benches/merge_strategies.rs
Normal file
131
crates/trie/common/benches/merge_strategies.rs
Normal file
@@ -0,0 +1,131 @@
|
||||
#![allow(missing_docs, unreachable_pub)]
|
||||
|
||||
use alloy_primitives::{B256, U256};
|
||||
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
|
||||
use reth_primitives_traits::Account;
|
||||
use reth_trie_common::{HashedPostState, HashedPostStateSorted, HashedStorage, HashedStorageSorted};
|
||||
use std::collections::HashMap;
|
||||
|
||||
fn keccak256_mock(n: u64) -> B256 {
|
||||
let mut bytes = [0u8; 32];
|
||||
bytes[24..].copy_from_slice(&n.to_be_bytes());
|
||||
for (i, item) in bytes.iter_mut().enumerate() {
|
||||
*item ^= ((n.wrapping_mul(0x9e3779b97f4a7c15) >> (i * 8)) & 0xff) as u8;
|
||||
}
|
||||
B256::from(bytes)
|
||||
}
|
||||
|
||||
fn generate_hashed_post_state_sorted(
|
||||
base_offset: u64,
|
||||
num_accounts: usize,
|
||||
storage_slots_per_account: usize,
|
||||
) -> HashedPostStateSorted {
|
||||
let mut state = HashedPostState::default();
|
||||
|
||||
for i in 0..num_accounts {
|
||||
let hashed_address = keccak256_mock(base_offset + i as u64);
|
||||
let account = Some(Account {
|
||||
nonce: i as u64,
|
||||
balance: U256::from(i * 1000),
|
||||
bytecode_hash: None,
|
||||
});
|
||||
state.accounts.insert(hashed_address, account);
|
||||
|
||||
if storage_slots_per_account > 0 {
|
||||
let mut storage = HashedStorage::new(false);
|
||||
for j in 0..storage_slots_per_account {
|
||||
let slot = keccak256_mock(base_offset * 1000 + (i * 100 + j) as u64);
|
||||
storage.storage.insert(slot, U256::from(j));
|
||||
}
|
||||
state.storages.insert(hashed_address, storage);
|
||||
}
|
||||
}
|
||||
|
||||
state.into_sorted()
|
||||
}
|
||||
|
||||
fn generate_states(
|
||||
num_sources: usize,
|
||||
items_per_source: usize,
|
||||
) -> Vec<HashedPostStateSorted> {
|
||||
(0..num_sources)
|
||||
.map(|i| {
|
||||
generate_hashed_post_state_sorted(
|
||||
(i * 1_000_000) as u64,
|
||||
items_per_source,
|
||||
5,
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn extend_ref_chain(states: &[HashedPostStateSorted]) -> HashedPostStateSorted {
|
||||
if states.is_empty() {
|
||||
return HashedPostStateSorted::default();
|
||||
}
|
||||
let mut result = states[0].clone();
|
||||
for state in &states[1..] {
|
||||
result.extend_ref(state);
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
fn bench_merge_strategies(c: &mut Criterion) {
|
||||
let mut group = c.benchmark_group("HashedPostStateSorted_merge");
|
||||
|
||||
// Test various source counts and items per source
|
||||
// We want to find where extend_ref beats kway_merge
|
||||
let configs = [
|
||||
// (num_sources, items_per_source)
|
||||
(5, 100),
|
||||
(5, 500),
|
||||
(5, 1000),
|
||||
(5, 2000),
|
||||
(5, 5000),
|
||||
(5, 10000),
|
||||
(10, 100),
|
||||
(10, 500),
|
||||
(10, 1000),
|
||||
(10, 2000),
|
||||
(10, 5000),
|
||||
(20, 100),
|
||||
(20, 500),
|
||||
(20, 1000),
|
||||
(20, 2000),
|
||||
];
|
||||
|
||||
for (num_sources, items_per_source) in configs {
|
||||
let states = generate_states(num_sources, items_per_source);
|
||||
let total_items = num_sources * items_per_source;
|
||||
let label = format!("{}src_{}items", num_sources, items_per_source);
|
||||
|
||||
group.throughput(Throughput::Elements(total_items as u64));
|
||||
|
||||
group.bench_with_input(
|
||||
BenchmarkId::new("kway_merge", &label),
|
||||
&states,
|
||||
|b, states| {
|
||||
b.iter(|| {
|
||||
let result = HashedPostStateSorted::merge_batch(black_box(states.iter()));
|
||||
black_box(result)
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
group.bench_with_input(
|
||||
BenchmarkId::new("extend_ref_new", &label),
|
||||
&states,
|
||||
|b, states| {
|
||||
b.iter(|| {
|
||||
let result = extend_ref_chain(black_box(states));
|
||||
black_box(result)
|
||||
});
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
criterion_group!(benches, bench_merge_strategies);
|
||||
criterion_main!(benches);
|
||||
@@ -636,30 +636,46 @@ impl HashedPostStateSorted {
|
||||
|
||||
/// Batch-merge sorted hashed post states. Iterator yields **newest to oldest**.
|
||||
///
|
||||
/// Uses k-way merge for O(n log k) complexity and one-pass accumulation for storages.
|
||||
/// Uses adaptive merge strategy:
|
||||
/// - k >= 30 sources: k-way merge (avoids O(k) copying amplification)
|
||||
/// - Large avg items/source (>= 2000): pairwise extend_ref (better cache locality)
|
||||
/// - Otherwise: HashMap merge then sort (lower per-element overhead for small data)
|
||||
pub fn merge_batch<'a>(states: impl IntoIterator<Item = &'a Self>) -> Self {
|
||||
let states: Vec<_> = states.into_iter().collect();
|
||||
if states.is_empty() {
|
||||
return Self::default();
|
||||
}
|
||||
match states.len() {
|
||||
0 => return Self::default(),
|
||||
1 => return states[0].clone(),
|
||||
n => {
|
||||
let total_items: usize = states.iter().map(|s| s.total_len()).sum();
|
||||
|
||||
if n >= crate::utils::KWAY_MIN_SOURCES {
|
||||
Self::merge_batch_kway(&states)
|
||||
} else if crate::utils::prefer_sorted_merge(n, total_items) {
|
||||
let mut result = states[0].clone();
|
||||
for state in &states[1..] {
|
||||
result.extend_ref(state);
|
||||
}
|
||||
result
|
||||
} else {
|
||||
Self::merge_batch_hashmap(&states)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// K-way merge implementation for many sources.
|
||||
fn merge_batch_kway<'a>(states: &[&'a Self]) -> Self {
|
||||
let accounts = kway_merge_sorted(states.iter().map(|s| s.accounts.as_slice()));
|
||||
|
||||
struct StorageAcc<'a> {
|
||||
/// Account storage was cleared (e.g., SELFDESTRUCT).
|
||||
wiped: bool,
|
||||
/// Stop collecting older slices after seeing a wipe.
|
||||
sealed: bool,
|
||||
/// Storage slot slices to merge, ordered newest to oldest.
|
||||
slices: Vec<&'a [(B256, U256)]>,
|
||||
}
|
||||
|
||||
let mut acc: B256Map<StorageAcc<'_>> = B256Map::default();
|
||||
|
||||
// Accumulate storage slices per address from newest to oldest state.
|
||||
// Once we see a `wiped` flag, the account was cleared at that point,
|
||||
// so older storage slots are irrelevant - we "seal" and stop collecting.
|
||||
for state in &states {
|
||||
for state in states {
|
||||
for (addr, storage) in &state.storages {
|
||||
let entry = acc.entry(*addr).or_insert_with(|| StorageAcc {
|
||||
wiped: false,
|
||||
@@ -690,6 +706,15 @@ impl HashedPostStateSorted {
|
||||
Self { accounts, storages }
|
||||
}
|
||||
|
||||
/// HashMap-based merge for small data with low per-element overhead.
|
||||
fn merge_batch_hashmap<'a>(states: &[&'a Self]) -> Self {
|
||||
let mut unsorted = HashedPostState::default();
|
||||
for state in states.iter().rev() {
|
||||
unsorted.extend_from_sorted(state);
|
||||
}
|
||||
unsorted.into_sorted()
|
||||
}
|
||||
|
||||
/// Clears all accounts and storage data.
|
||||
pub fn clear(&mut self) {
|
||||
self.accounts.clear();
|
||||
@@ -753,17 +778,49 @@ impl HashedStorageSorted {
|
||||
|
||||
/// Batch-merge sorted hashed storage. Iterator yields **newest to oldest**.
|
||||
/// If any update is wiped, prior data is discarded.
|
||||
///
|
||||
/// Uses adaptive merge strategy:
|
||||
/// - k >= 30 sources: k-way merge (avoids O(k) copying amplification)
|
||||
/// - Large avg items/source (>= 2000): pairwise extend_ref (better cache locality)
|
||||
/// - Otherwise: HashMap merge then sort (lower per-element overhead for small data)
|
||||
pub fn merge_batch<'a>(updates: impl IntoIterator<Item = &'a Self>) -> Self {
|
||||
let updates: Vec<_> = updates.into_iter().collect();
|
||||
if updates.is_empty() {
|
||||
return Self::default();
|
||||
}
|
||||
|
||||
let wipe_idx = updates.iter().position(|u| u.wiped);
|
||||
let relevant = wipe_idx.map_or(&updates[..], |idx| &updates[..=idx]);
|
||||
let storage_slots = kway_merge_sorted(relevant.iter().map(|u| u.storage_slots.as_slice()));
|
||||
|
||||
Self { wiped: wipe_idx.is_some(), storage_slots }
|
||||
match relevant.len() {
|
||||
0 => return Self::default(),
|
||||
1 => return Self { wiped: wipe_idx.is_some(), ..relevant[0].clone() },
|
||||
n => {
|
||||
let total_items: usize = relevant.iter().map(|u| u.len()).sum();
|
||||
|
||||
let storage_slots = if n >= crate::utils::KWAY_MIN_SOURCES {
|
||||
kway_merge_sorted(relevant.iter().map(|u| u.storage_slots.as_slice()))
|
||||
} else if crate::utils::prefer_sorted_merge(n, total_items) {
|
||||
let mut result = relevant[0].storage_slots.clone();
|
||||
for update in &relevant[1..] {
|
||||
extend_sorted_vec(&mut result, &update.storage_slots);
|
||||
}
|
||||
result
|
||||
} else {
|
||||
Self::merge_batch_hashmap(relevant)
|
||||
};
|
||||
|
||||
Self { wiped: wipe_idx.is_some(), storage_slots }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn merge_batch_hashmap(updates: &[&Self]) -> Vec<(B256, U256)> {
|
||||
let mut map: B256Map<U256> = B256Map::default();
|
||||
for update in updates.iter().rev() {
|
||||
for &(slot, value) in &update.storage_slots {
|
||||
map.insert(slot, value);
|
||||
}
|
||||
}
|
||||
let mut result: Vec<_> = map.into_iter().collect();
|
||||
result.sort_unstable_by_key(|(k, _)| *k);
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -626,33 +626,45 @@ impl TrieUpdatesSorted {
|
||||
|
||||
/// Batch-merge sorted trie updates. Iterator yields **newest to oldest**.
|
||||
///
|
||||
/// This is more efficient than repeated `extend_ref` calls for large batches,
|
||||
/// using k-way merge for O(n log k) complexity instead of O(n * k).
|
||||
/// Uses adaptive merge strategy:
|
||||
/// - k >= 30 sources: k-way merge (avoids O(k) copying amplification)
|
||||
/// - Large avg items/source (>= 2000): pairwise extend_ref (better cache locality)
|
||||
/// - Otherwise: HashMap merge then sort (lower per-element overhead for small data)
|
||||
pub fn merge_batch<'a>(updates: impl IntoIterator<Item = &'a Self>) -> Self {
|
||||
let updates: Vec<_> = updates.into_iter().collect();
|
||||
if updates.is_empty() {
|
||||
return Self::default();
|
||||
}
|
||||
match updates.len() {
|
||||
0 => return Self::default(),
|
||||
1 => return updates[0].clone(),
|
||||
n => {
|
||||
let total_items: usize = updates.iter().map(|u| u.total_len()).sum();
|
||||
|
||||
// Merge account nodes using k-way merge. Newest (index 0) takes precedence.
|
||||
if n >= crate::utils::KWAY_MIN_SOURCES {
|
||||
Self::merge_batch_kway(&updates)
|
||||
} else if crate::utils::prefer_sorted_merge(n, total_items) {
|
||||
let mut result = updates[0].clone();
|
||||
for update in &updates[1..] {
|
||||
result.extend_ref(update);
|
||||
}
|
||||
result
|
||||
} else {
|
||||
Self::merge_batch_hashmap(&updates)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn merge_batch_kway<'a>(updates: &[&'a Self]) -> Self {
|
||||
let account_nodes = kway_merge_sorted(updates.iter().map(|u| u.account_nodes.as_slice()));
|
||||
|
||||
// Accumulator for collecting storage trie slices per address.
|
||||
// We process updates newest-to-oldest and stop collecting for an address
|
||||
// once we hit a "deleted" storage (sealed=true), since older data is irrelevant.
|
||||
struct StorageAcc<'a> {
|
||||
/// Storage trie was deleted (account removed or cleared).
|
||||
is_deleted: bool,
|
||||
/// Stop collecting older slices after seeing a deletion.
|
||||
sealed: bool,
|
||||
/// Storage trie node slices to merge, ordered newest to oldest.
|
||||
slices: Vec<&'a [(Nibbles, Option<BranchNodeCompact>)]>,
|
||||
}
|
||||
|
||||
let mut acc: B256Map<StorageAcc<'_>> = B256Map::default();
|
||||
|
||||
// Collect storage slices per address, respecting deletion boundaries
|
||||
for update in &updates {
|
||||
for update in updates {
|
||||
for (addr, storage) in &update.storage_tries {
|
||||
let entry = acc.entry(*addr).or_insert_with(|| StorageAcc {
|
||||
is_deleted: false,
|
||||
@@ -660,14 +672,12 @@ impl TrieUpdatesSorted {
|
||||
slices: Vec::new(),
|
||||
});
|
||||
|
||||
// Skip if we already hit a deletion for this address (older data is irrelevant)
|
||||
if entry.sealed {
|
||||
continue;
|
||||
}
|
||||
|
||||
entry.slices.push(storage.storage_nodes.as_slice());
|
||||
|
||||
// If this storage was deleted, mark as deleted and seal to ignore older updates
|
||||
if storage.is_deleted {
|
||||
entry.is_deleted = true;
|
||||
entry.sealed = true;
|
||||
@@ -675,7 +685,6 @@ impl TrieUpdatesSorted {
|
||||
}
|
||||
}
|
||||
|
||||
// Merge each address's storage slices using k-way merge
|
||||
let storage_tries = acc
|
||||
.into_iter()
|
||||
.map(|(addr, entry)| {
|
||||
@@ -686,6 +695,14 @@ impl TrieUpdatesSorted {
|
||||
|
||||
Self { account_nodes, storage_tries }
|
||||
}
|
||||
|
||||
fn merge_batch_hashmap<'a>(updates: &[&'a Self]) -> Self {
|
||||
let mut unsorted = TrieUpdates::default();
|
||||
for update in updates.iter().rev() {
|
||||
unsorted.extend_from_sorted(update);
|
||||
}
|
||||
unsorted.into_sorted()
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<Self> for TrieUpdatesSorted {
|
||||
@@ -780,18 +797,51 @@ impl StorageTrieUpdatesSorted {
|
||||
|
||||
/// Batch-merge sorted storage trie updates. Iterator yields **newest to oldest**.
|
||||
/// If any update is deleted, older data is discarded.
|
||||
///
|
||||
/// Uses adaptive merge strategy:
|
||||
/// - k >= 30 sources: k-way merge (avoids O(k) copying amplification)
|
||||
/// - Large avg items/source (>= 2000): pairwise extend_ref (better cache locality)
|
||||
/// - Otherwise: HashMap merge then sort (lower per-element overhead for small data)
|
||||
pub fn merge_batch<'a>(updates: impl IntoIterator<Item = &'a Self>) -> Self {
|
||||
let updates: Vec<_> = updates.into_iter().collect();
|
||||
if updates.is_empty() {
|
||||
return Self::default();
|
||||
}
|
||||
|
||||
// Discard updates older than the first deletion since the trie was wiped at that point.
|
||||
let del_idx = updates.iter().position(|u| u.is_deleted);
|
||||
let relevant = del_idx.map_or(&updates[..], |idx| &updates[..=idx]);
|
||||
let storage_nodes = kway_merge_sorted(relevant.iter().map(|u| u.storage_nodes.as_slice()));
|
||||
|
||||
Self { is_deleted: del_idx.is_some(), storage_nodes }
|
||||
match relevant.len() {
|
||||
0 => return Self::default(),
|
||||
1 => return Self { is_deleted: del_idx.is_some(), ..relevant[0].clone() },
|
||||
n => {
|
||||
let total_items: usize = relevant.iter().map(|u| u.len()).sum();
|
||||
|
||||
let storage_nodes = if n >= crate::utils::KWAY_MIN_SOURCES {
|
||||
kway_merge_sorted(relevant.iter().map(|u| u.storage_nodes.as_slice()))
|
||||
} else if crate::utils::prefer_sorted_merge(n, total_items) {
|
||||
let mut result = relevant[0].storage_nodes.clone();
|
||||
for update in &relevant[1..] {
|
||||
extend_sorted_vec(&mut result, &update.storage_nodes);
|
||||
}
|
||||
result
|
||||
} else {
|
||||
Self::merge_batch_hashmap(relevant)
|
||||
};
|
||||
|
||||
Self { is_deleted: del_idx.is_some(), storage_nodes }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn merge_batch_hashmap(
|
||||
updates: &[&Self],
|
||||
) -> Vec<(Nibbles, Option<BranchNodeCompact>)> {
|
||||
let mut map: HashMap<Nibbles, Option<BranchNodeCompact>> = HashMap::default();
|
||||
for update in updates.iter().rev() {
|
||||
for (nibbles, node) in &update.storage_nodes {
|
||||
map.insert(*nibbles, node.clone());
|
||||
}
|
||||
}
|
||||
let mut result: Vec<_> = map.into_iter().collect();
|
||||
result.sort_unstable_by_key(|(k, _)| *k);
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,22 @@ use alloc::vec::Vec;
|
||||
use core::cmp::Ordering;
|
||||
use itertools::Itertools;
|
||||
|
||||
/// Minimum average items per source to prefer pairwise sorted merge over HashMap merge.
|
||||
pub(crate) const PAIRWISE_MIN_AVG_ITEMS: usize = 2000;
|
||||
|
||||
/// Minimum number of sources that triggers k-way merge instead of pairwise sorted merge.
|
||||
pub(crate) const KWAY_MIN_SOURCES: usize = 30;
|
||||
|
||||
/// Returns true if pairwise sorted merge is preferred over HashMap merge.
|
||||
/// Returns false if k >= KWAY_MIN_SOURCES (use kway) or avg items < threshold (use HashMap).
|
||||
#[inline]
|
||||
pub(crate) fn prefer_sorted_merge(num_sources: usize, total_items: usize) -> bool {
|
||||
if num_sources >= KWAY_MIN_SOURCES {
|
||||
return false;
|
||||
}
|
||||
total_items >= PAIRWISE_MIN_AVG_ITEMS.saturating_mul(num_sources)
|
||||
}
|
||||
|
||||
/// Merge sorted slices into a sorted `Vec`. First occurrence wins for duplicate keys.
|
||||
///
|
||||
/// Callers pass slices in priority order (index 0 = highest priority), so the first
|
||||
@@ -26,45 +42,56 @@ where
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Extend a sorted vector with another sorted vector.
|
||||
/// Values from `other` take precedence for duplicate keys.
|
||||
///
|
||||
/// Extend a sorted vector with another sorted vector using O(n+m) merge.
|
||||
/// Values from `other` take precedence for duplicate keys.
|
||||
pub(crate) fn extend_sorted_vec<K, V>(target: &mut Vec<(K, V)>, other: &[(K, V)])
|
||||
where
|
||||
K: Clone + Ord,
|
||||
V: Clone,
|
||||
{
|
||||
let cmp = |a: &(K, V), b: &(K, V)| a.0.cmp(&b.0);
|
||||
|
||||
if other.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut other_iter = other.iter().peekable();
|
||||
let initial_len = target.len();
|
||||
for i in 0..initial_len {
|
||||
while let Some(other_item) = other_iter.peek() {
|
||||
let target_item = &mut target[i];
|
||||
match cmp(other_item, target_item) {
|
||||
Ordering::Less => {
|
||||
target.push(other_iter.next().unwrap().clone());
|
||||
}
|
||||
Ordering::Equal => {
|
||||
target_item.1 = other_iter.next().unwrap().1.clone();
|
||||
break;
|
||||
}
|
||||
Ordering::Greater => {
|
||||
break;
|
||||
}
|
||||
if target.is_empty() {
|
||||
target.extend_from_slice(other);
|
||||
return;
|
||||
}
|
||||
|
||||
// Fast path: non-overlapping ranges - just append
|
||||
if target.last().map(|(k, _)| k) < other.first().map(|(k, _)| k) {
|
||||
target.extend_from_slice(other);
|
||||
return;
|
||||
}
|
||||
|
||||
// Move ownership of target to avoid cloning owned elements
|
||||
let left = core::mem::take(target);
|
||||
let mut out = Vec::with_capacity(left.len() + other.len());
|
||||
|
||||
let mut a = left.into_iter().peekable();
|
||||
let mut b = other.iter().peekable();
|
||||
|
||||
while let (Some(aa), Some(bb)) = (a.peek(), b.peek()) {
|
||||
match aa.0.cmp(&bb.0) {
|
||||
Ordering::Less => {
|
||||
out.push(a.next().unwrap());
|
||||
}
|
||||
Ordering::Greater => {
|
||||
out.push(b.next().unwrap().clone());
|
||||
}
|
||||
Ordering::Equal => {
|
||||
// `other` takes precedence for duplicate keys - reuse key from `a`
|
||||
let (k, _) = a.next().unwrap();
|
||||
out.push((k, b.next().unwrap().1.clone()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
target.extend(other_iter.cloned());
|
||||
if target.len() > initial_len {
|
||||
target.sort_by(cmp);
|
||||
}
|
||||
// Drain remaining: `a` moves, `b` clones
|
||||
out.extend(a);
|
||||
out.extend(b.cloned());
|
||||
|
||||
*target = out;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -79,6 +106,55 @@ mod tests {
|
||||
assert_eq!(target, vec![(1, "a"), (2, "b"), (3, "c_new")]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_extend_sorted_vec_empty_target() {
|
||||
let mut target: Vec<(i32, &str)> = vec![];
|
||||
let other = vec![(1, "a"), (2, "b")];
|
||||
extend_sorted_vec(&mut target, &other);
|
||||
assert_eq!(target, vec![(1, "a"), (2, "b")]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_extend_sorted_vec_empty_other() {
|
||||
let mut target = vec![(1, "a"), (2, "b")];
|
||||
let other: Vec<(i32, &str)> = vec![];
|
||||
extend_sorted_vec(&mut target, &other);
|
||||
assert_eq!(target, vec![(1, "a"), (2, "b")]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_extend_sorted_vec_all_duplicates() {
|
||||
let mut target = vec![(1, "old1"), (2, "old2"), (3, "old3")];
|
||||
let other = vec![(1, "new1"), (2, "new2"), (3, "new3")];
|
||||
extend_sorted_vec(&mut target, &other);
|
||||
// other takes precedence
|
||||
assert_eq!(target, vec![(1, "new1"), (2, "new2"), (3, "new3")]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_extend_sorted_vec_interleaved() {
|
||||
let mut target = vec![(1, "a"), (3, "c"), (5, "e")];
|
||||
let other = vec![(2, "b"), (4, "d"), (6, "f")];
|
||||
extend_sorted_vec(&mut target, &other);
|
||||
assert_eq!(target, vec![(1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e"), (6, "f")]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_extend_sorted_vec_other_all_smaller() {
|
||||
let mut target = vec![(5, "e"), (6, "f")];
|
||||
let other = vec![(1, "a"), (2, "b")];
|
||||
extend_sorted_vec(&mut target, &other);
|
||||
assert_eq!(target, vec![(1, "a"), (2, "b"), (5, "e"), (6, "f")]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_extend_sorted_vec_other_all_larger() {
|
||||
let mut target = vec![(1, "a"), (2, "b")];
|
||||
let other = vec![(5, "e"), (6, "f")];
|
||||
extend_sorted_vec(&mut target, &other);
|
||||
assert_eq!(target, vec![(1, "a"), (2, "b"), (5, "e"), (6, "f")]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_kway_merge_sorted_basic() {
|
||||
let slice1 = vec![(1, "a1"), (3, "c1")];
|
||||
@@ -123,4 +199,24 @@ mod tests {
|
||||
let result: Vec<(i32, &str)> = kway_merge_sorted(Vec::<&[(i32, &str)]>::new());
|
||||
assert!(result.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prefer_sorted_merge_kway_threshold() {
|
||||
assert!(!prefer_sorted_merge(30, 100_000));
|
||||
assert!(!prefer_sorted_merge(50, 200_000));
|
||||
assert!(prefer_sorted_merge(29, 29 * PAIRWISE_MIN_AVG_ITEMS));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prefer_sorted_merge_pairwise_threshold() {
|
||||
assert!(prefer_sorted_merge(5, 5 * PAIRWISE_MIN_AVG_ITEMS));
|
||||
assert!(prefer_sorted_merge(10, 10 * PAIRWISE_MIN_AVG_ITEMS + 1));
|
||||
assert!(!prefer_sorted_merge(5, 5 * PAIRWISE_MIN_AVG_ITEMS - 1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prefer_sorted_merge_small_data() {
|
||||
assert!(!prefer_sorted_merge(2, 100));
|
||||
assert!(!prefer_sorted_merge(5, 1000));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user