Compare commits

..

1 Commits

Author SHA1 Message Date
Dan Cline
299b7f1e74 wip: vibed tx clone fn 2025-12-18 19:41:59 -05:00
20 changed files with 393 additions and 682 deletions

2
Cargo.lock generated
View File

@@ -8247,7 +8247,6 @@ dependencies = [
"metrics",
"metrics-util",
"mini-moka",
"moka",
"parking_lot",
"proptest",
"rand 0.8.5",
@@ -10584,7 +10583,6 @@ dependencies = [
"reth-stages-api",
"reth-static-file",
"reth-static-file-types",
"reth-storage-api",
"reth-storage-errors",
"reth-testing-utils",
"reth-trie",

View File

@@ -587,7 +587,6 @@ url = { version = "2.3", default-features = false }
zstd = "0.13"
byteorder = "1"
mini-moka = "0.10"
moka = "0.12"
tar-no-std = { version = "0.3.2", default-features = false }
miniz_oxide = { version = "0.8.4", default-features = false }
chrono = "0.4.41"

View File

@@ -53,7 +53,6 @@ futures.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "sync", "macros"] }
mini-moka = { workspace = true, features = ["sync"] }
moka = { workspace = true, features = ["sync"] }
smallvec.workspace = true
# metrics

View File

@@ -393,7 +393,7 @@ where
metrics,
terminate_execution,
precompile_cache_disabled,
precompile_cache_map,
mut precompile_cache_map,
} = self;
let mut state_provider = match provider.build() {

View File

@@ -1,56 +1,50 @@
//! Contains a precompile cache backed by `schnellru::LruMap` (LRU by length).
use alloy_primitives::Bytes;
use dashmap::DashMap;
use parking_lot::Mutex;
use reth_evm::precompiles::{DynPrecompile, Precompile, PrecompileInput};
use revm::precompile::{PrecompileId, PrecompileOutput, PrecompileResult};
use revm_primitives::Address;
use std::{hash::Hash, sync::Arc};
use schnellru::LruMap;
use std::{
collections::HashMap,
hash::{Hash, Hasher},
sync::Arc,
};
/// Default max cache size for [`PrecompileCache`]
const MAX_CACHE_SIZE: u32 = 10_000;
/// Stores caches for each precompile.
#[derive(Debug, Clone, Default)]
pub struct PrecompileCacheMap<S>(Arc<DashMap<Address, PrecompileCache<S>>>)
pub struct PrecompileCacheMap<S>(HashMap<Address, PrecompileCache<S>>)
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static;
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone;
impl<S> PrecompileCacheMap<S>
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static,
{
pub(crate) fn cache_for_address(&self, address: Address) -> PrecompileCache<S> {
// Try just using `.get` first to avoid acquiring a write lock.
if let Some(cache) = self.0.get(&address) {
return cache.clone();
}
// Otherwise, fallback to `.entry` and initialize the cache.
//
// This should be very rare as caches for all precompiles will be initialized as soon as
// first EVM is created.
pub(crate) fn cache_for_address(&mut self, address: Address) -> PrecompileCache<S> {
self.0.entry(address).or_default().clone()
}
}
/// Cache for precompiles, for each input stores the result.
///
/// [`LruMap`] requires a mutable reference on `get` since it updates the LRU order,
/// so we use a [`Mutex`] instead of an `RwLock`.
#[derive(Debug, Clone)]
pub struct PrecompileCache<S>(
moka::sync::Cache<Bytes, CacheEntry<S>, alloy_primitives::map::DefaultHashBuilder>,
)
pub struct PrecompileCache<S>(Arc<Mutex<LruMap<CacheKey<S>, CacheEntry>>>)
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static;
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone;
impl<S> Default for PrecompileCache<S>
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static,
{
fn default() -> Self {
Self(
moka::sync::CacheBuilder::new(MAX_CACHE_SIZE as u64)
.initial_capacity(MAX_CACHE_SIZE as usize)
.build_with_hasher(Default::default()),
)
Self(Arc::new(Mutex::new(LruMap::new(schnellru::ByLength::new(MAX_CACHE_SIZE)))))
}
}
@@ -58,31 +52,63 @@ impl<S> PrecompileCache<S>
where
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static,
{
fn get(&self, input: &[u8], spec: S) -> Option<CacheEntry<S>> {
self.0.get(input).filter(|e| e.spec == spec)
fn get(&self, key: &CacheKeyRef<'_, S>) -> Option<CacheEntry> {
self.0.lock().get(key).cloned()
}
/// Inserts the given key and value into the cache, returning the new cache size.
fn insert(&self, input: Bytes, value: CacheEntry<S>) -> usize {
self.0.insert(input, value);
self.0.entry_count() as usize
fn insert(&self, key: CacheKey<S>, value: CacheEntry) -> usize {
let mut cache = self.0.lock();
cache.insert(key, value);
cache.len()
}
}
/// Cache key, spec id and precompile call input. spec id is included in the key to account for
/// precompile repricing across fork activations.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CacheKey<S>((S, Bytes));
impl<S> CacheKey<S> {
const fn new(spec_id: S, input: Bytes) -> Self {
Self((spec_id, input))
}
}
/// Cache key reference, used to avoid cloning the input bytes when looking up using a [`CacheKey`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CacheKeyRef<'a, S>((S, &'a [u8]));
impl<'a, S> CacheKeyRef<'a, S> {
const fn new(spec_id: S, input: &'a [u8]) -> Self {
Self((spec_id, input))
}
}
impl<S: PartialEq> PartialEq<CacheKey<S>> for CacheKeyRef<'_, S> {
fn eq(&self, other: &CacheKey<S>) -> bool {
self.0 .0 == other.0 .0 && self.0 .1 == other.0 .1.as_ref()
}
}
impl<'a, S: Hash> Hash for CacheKeyRef<'a, S> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0 .0.hash(state);
self.0 .1.hash(state);
}
}
/// Cache entry, precompile successful output.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CacheEntry<S> {
output: PrecompileOutput,
spec: S,
}
pub struct CacheEntry(PrecompileOutput);
impl<S> CacheEntry<S> {
impl CacheEntry {
const fn gas_used(&self) -> u64 {
self.output.gas_used
self.0.gas_used
}
fn to_precompile_result(&self) -> PrecompileResult {
Ok(self.output.clone())
Ok(self.0.clone())
}
}
@@ -164,7 +190,9 @@ where
}
fn call(&self, input: PrecompileInput<'_>) -> PrecompileResult {
if let Some(entry) = &self.cache.get(input.data, self.spec_id.clone()) {
let key = CacheKeyRef::new(self.spec_id.clone(), input.data);
if let Some(entry) = &self.cache.get(&key) {
self.increment_by_one_precompile_cache_hits();
if input.gas >= entry.gas_used() {
return entry.to_precompile_result()
@@ -176,10 +204,8 @@ where
match &result {
Ok(output) => {
let size = self.cache.insert(
Bytes::copy_from_slice(calldata),
CacheEntry { output: output.clone(), spec: self.spec_id.clone() },
);
let key = CacheKey::new(self.spec_id.clone(), Bytes::copy_from_slice(calldata));
let size = self.cache.insert(key, CacheEntry(output.clone()));
self.set_precompile_cache_size_metric(size as f64);
self.increment_by_one_precompile_cache_misses();
}
@@ -220,12 +246,31 @@ impl CachedPrecompileMetrics {
#[cfg(test)]
mod tests {
use std::hash::DefaultHasher;
use super::*;
use reth_evm::{EthEvmFactory, Evm, EvmEnv, EvmFactory};
use reth_revm::db::EmptyDB;
use revm::{context::TxEnv, precompile::PrecompileOutput};
use revm_primitives::hardfork::SpecId;
#[test]
fn test_cache_key_ref_hash() {
let key1 = CacheKey::new(SpecId::PRAGUE, b"test_input".into());
let key2 = CacheKeyRef::new(SpecId::PRAGUE, b"test_input");
assert!(PartialEq::eq(&key2, &key1));
let mut hasher = DefaultHasher::new();
key1.hash(&mut hasher);
let hash1 = hasher.finish();
let mut hasher = DefaultHasher::new();
key2.hash(&mut hasher);
let hash2 = hasher.finish();
assert_eq!(hash1, hash2);
}
#[test]
fn test_precompile_cache_basic() {
let dyn_precompile: DynPrecompile = (|_input: PrecompileInput<'_>| -> PrecompileResult {
@@ -248,11 +293,12 @@ mod tests {
reverted: false,
};
let input = b"test_input";
let expected = CacheEntry { output, spec: SpecId::PRAGUE };
cache.cache.insert(input.into(), expected.clone());
let key = CacheKey::new(SpecId::PRAGUE, b"test_input".into());
let expected = CacheEntry(output);
cache.cache.insert(key, expected.clone());
let actual = cache.cache.get(input, SpecId::PRAGUE).unwrap();
let key = CacheKeyRef::new(SpecId::PRAGUE, b"test_input");
let actual = cache.cache.get(&key).unwrap();
assert_eq!(actual, expected);
}
@@ -266,7 +312,7 @@ mod tests {
let address1 = Address::repeat_byte(1);
let address2 = Address::repeat_byte(2);
let cache_map = PrecompileCacheMap::default();
let mut cache_map = PrecompileCacheMap::default();
// create the first precompile with a specific output
let precompile1: DynPrecompile = (PrecompileId::custom("custom"), {

View File

@@ -75,7 +75,6 @@ reth-network-p2p = { workspace = true, features = ["test-utils"] }
reth-downloaders.workspace = true
reth-static-file.workspace = true
reth-stages-api = { workspace = true, features = ["test-utils"] }
reth-storage-api.workspace = true
reth-testing-utils.workspace = true
reth-trie = { workspace = true, features = ["test-utils"] }
reth-provider = { workspace = true, features = ["test-utils"] }
@@ -117,7 +116,6 @@ test-utils = [
"reth-ethereum-primitives?/test-utils",
"reth-evm-ethereum/test-utils",
]
rocksdb = ["reth-provider/rocksdb"]
[[bench]]
name = "criterion"

View File

@@ -3,16 +3,17 @@ use alloy_primitives::{TxHash, TxNumber};
use num_traits::Zero;
use reth_config::config::{EtlConfig, TransactionLookupConfig};
use reth_db_api::{
table::{Decode, Decompress, Value},
cursor::{DbCursorRO, DbCursorRW},
table::Value,
tables,
transaction::DbTxMut,
RawKey, RawValue,
};
use reth_etl::Collector;
use reth_primitives_traits::{NodePrimitives, SignedTransaction};
use reth_provider::{
BlockReader, DBProvider, EitherWriter, PruneCheckpointReader, PruneCheckpointWriter,
RocksDBProviderFactory, StaticFileProviderFactory, StatsReader, StorageSettingsCache,
TransactionsProvider, TransactionsProviderExt,
BlockReader, DBProvider, PruneCheckpointReader, PruneCheckpointWriter,
StaticFileProviderFactory, StatsReader, TransactionsProvider, TransactionsProviderExt,
};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
use reth_stages_api::{
@@ -64,9 +65,7 @@ where
+ PruneCheckpointReader
+ StatsReader
+ StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value + SignedTransaction>>
+ TransactionsProviderExt
+ StorageSettingsCache
+ RocksDBProviderFactory,
+ TransactionsProviderExt,
{
/// Return the id of the stage
fn id(&self) -> StageId {
@@ -151,27 +150,16 @@ where
);
if range_output.is_final_range {
let total_hashes = hash_collector.len();
let interval = (total_hashes / 10).max(1);
// Use append mode when table is empty (first sync) - significantly faster
let append_only =
provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
let mut txhash_cursor = provider
.tx_ref()
.cursor_write::<tables::RawTable<tables::TransactionHashNumbers>>()?;
// Create RocksDB batch if feature is enabled
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb = provider.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_batch = rocksdb.batch();
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocksdb_batch = ();
// Create writer that routes to either MDBX or RocksDB based on settings
let mut writer =
EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
let total_hashes = hash_collector.len();
let interval = (total_hashes / 10).max(1);
for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
let (hash_bytes, number_bytes) = hash_to_number?;
let (hash, number) = hash_to_number?;
if index > 0 && index.is_multiple_of(interval) {
info!(
target: "sync::stages::transaction_lookup",
@@ -181,16 +169,12 @@ where
);
}
// Decode from raw ETL bytes
let hash = TxHash::decode(&hash_bytes)?;
let tx_num = TxNumber::decompress(&number_bytes)?;
writer.put_transaction_hash_number(hash, tx_num, append_only)?;
}
// Extract and register RocksDB batch for commit at provider level
#[cfg(all(unix, feature = "rocksdb"))]
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
let key = RawKey::<TxHash>::from_vec(hash);
if append_only {
txhash_cursor.append(key, &RawValue::<TxNumber>::from_vec(number))?
} else {
txhash_cursor.insert(key, &RawValue::<TxNumber>::from_vec(number))?
}
}
trace!(target: "sync::stages::transaction_lookup",
@@ -215,19 +199,11 @@ where
provider: &Provider,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let tx = provider.tx_ref();
let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);
// Create RocksDB batch if feature is enabled
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb = provider.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_batch = rocksdb.batch();
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocksdb_batch = ();
// Create writer that routes to either MDBX or RocksDB based on settings
let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
// Cursor to unwind tx hash to number
let mut tx_hash_number_cursor = tx.cursor_write::<tables::TransactionHashNumbers>()?;
let static_file_provider = provider.static_file_provider();
let rev_walker = provider
.block_body_indices_range(range.clone())?
@@ -242,18 +218,15 @@ where
// Delete all transactions that belong to this block
for tx_id in body.tx_num_range() {
if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? {
writer.delete_transaction_hash_number(transaction.trie_hash())?;
// First delete the transaction and hash to id mapping
if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? &&
tx_hash_number_cursor.seek_exact(transaction.trie_hash())?.is_some()
{
tx_hash_number_cursor.delete_current()?;
}
}
}
// Extract and register RocksDB batch for commit at provider level
#[cfg(all(unix, feature = "rocksdb"))]
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
}
Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(unwind_to)
.with_entities_stage_checkpoint(stage_checkpoint(provider)?),
@@ -293,7 +266,7 @@ mod tests {
};
use alloy_primitives::{BlockNumber, B256};
use assert_matches::assert_matches;
use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
use reth_db_api::transaction::DbTx;
use reth_ethereum_primitives::Block;
use reth_primitives_traits::SealedBlock;
use reth_provider::{
@@ -608,160 +581,4 @@ mod tests {
self.ensure_no_hash_by_block(input.unwind_to)
}
}
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb_tests {
use super::*;
use reth_provider::RocksDBProviderFactory;
use reth_storage_api::StorageSettings;
/// Test that when `transaction_hash_numbers_in_rocksdb` is enabled, the stage
/// writes transaction hash mappings to `RocksDB` instead of MDBX.
#[tokio::test]
async fn execute_writes_to_rocksdb_when_enabled() {
let (previous_stage, stage_progress) = (110, 100);
let mut rng = generators::rng();
// Set up the runner
let runner = TransactionLookupTestRunner::default();
// Enable RocksDB for transaction hash numbers
runner.db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
);
let input = ExecInput {
target: Some(previous_stage),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
// Insert blocks with transactions
let blocks = random_block_range(
&mut rng,
stage_progress + 1..=previous_stage,
BlockRangeParams {
parent: Some(B256::ZERO),
tx_count: 1..3, // Ensure we have transactions
..Default::default()
},
);
runner
.db
.insert_blocks(blocks.iter(), StorageKind::Static)
.expect("failed to insert blocks");
// Count expected transactions
let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum();
assert!(expected_tx_count > 0, "test requires at least one transaction");
// Execute the stage
let rx = runner.execute(input);
let result = rx.await.unwrap();
assert!(result.is_ok(), "stage execution failed: {:?}", result);
// Verify MDBX table is empty (data should be in RocksDB)
let mdbx_count = runner.db.count_entries::<tables::TransactionHashNumbers>().unwrap();
assert_eq!(
mdbx_count, 0,
"MDBX TransactionHashNumbers should be empty when RocksDB is enabled"
);
// Verify RocksDB has the data
let rocksdb = runner.db.factory.rocksdb_provider();
let mut rocksdb_count = 0;
for block in &blocks {
for tx in &block.body().transactions {
let hash = *tx.tx_hash();
let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
assert!(result.is_some(), "Transaction hash {:?} not found in RocksDB", hash);
rocksdb_count += 1;
}
}
assert_eq!(
rocksdb_count, expected_tx_count,
"RocksDB should contain all transaction hashes"
);
}
/// Test that when `transaction_hash_numbers_in_rocksdb` is enabled, the stage
/// unwind deletes transaction hash mappings from `RocksDB` instead of MDBX.
#[tokio::test]
async fn unwind_deletes_from_rocksdb_when_enabled() {
let (previous_stage, stage_progress) = (110, 100);
let mut rng = generators::rng();
// Set up the runner
let runner = TransactionLookupTestRunner::default();
// Enable RocksDB for transaction hash numbers
runner.db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
);
// Insert blocks with transactions
let blocks = random_block_range(
&mut rng,
stage_progress + 1..=previous_stage,
BlockRangeParams {
parent: Some(B256::ZERO),
tx_count: 1..3, // Ensure we have transactions
..Default::default()
},
);
runner
.db
.insert_blocks(blocks.iter(), StorageKind::Static)
.expect("failed to insert blocks");
// Count expected transactions
let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum();
assert!(expected_tx_count > 0, "test requires at least one transaction");
// Execute the stage first to populate RocksDB
let exec_input = ExecInput {
target: Some(previous_stage),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
let rx = runner.execute(exec_input);
let result = rx.await.unwrap();
assert!(result.is_ok(), "stage execution failed: {:?}", result);
// Verify RocksDB has the data before unwind
let rocksdb = runner.db.factory.rocksdb_provider();
for block in &blocks {
for tx in &block.body().transactions {
let hash = *tx.tx_hash();
let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
assert!(
result.is_some(),
"Transaction hash {:?} should exist before unwind",
hash
);
}
}
// Now unwind to stage_progress (removing all the blocks we added)
let unwind_input = UnwindInput {
checkpoint: StageCheckpoint::new(previous_stage),
unwind_to: stage_progress,
bad_block: None,
};
let unwind_result = runner.unwind(unwind_input).await;
assert!(unwind_result.is_ok(), "stage unwind failed: {:?}", unwind_result);
// Verify RocksDB data is deleted after unwind
let rocksdb = runner.db.factory.rocksdb_provider();
for block in &blocks {
for tx in &block.body().transactions {
let hash = *tx.tx_hash();
let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
assert!(
result.is_none(),
"Transaction hash {:?} should be deleted from RocksDB after unwind",
hash
);
}
}
}
}
}

View File

@@ -50,7 +50,7 @@ impl Default for TestStageDB {
create_test_rw_db(),
MAINNET.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
RocksDBProvider::builder(rocksdb_dir_path).with_default_tables().build().unwrap(),
RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
)
.expect("failed to create test provider factory"),
}
@@ -68,7 +68,7 @@ impl TestStageDB {
create_test_rw_db_with_path(path),
MAINNET.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
RocksDBProvider::builder(rocksdb_dir_path).with_default_tables().build().unwrap(),
RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
)
.expect("failed to create test provider factory"),
}

View File

@@ -12936,123 +12936,122 @@ int mdbx_txn_renew(MDBX_txn *txn) {
return LOG_IFERR(rc);
}
int mdbx_txn_clone(const MDBX_txn *src, MDBX_txn **dest) {
if (unlikely(!dest))
int mdbx_txn_clone(const MDBX_txn *src, MDBX_txn **out) {
if (unlikely(!out))
return LOG_IFERR(MDBX_EINVAL);
*dest = nullptr;
*out = nullptr;
int rc = check_txn(src, MDBX_TXN_BLOCKED);
if (unlikely(rc != MDBX_SUCCESS))
return LOG_IFERR(rc);
if (unlikely(!src))
return LOG_IFERR(MDBX_EINVAL);
if (unlikely(src->signature != txn_signature))
return LOG_IFERR(MDBX_EBADSIGN);
if (unlikely((src->flags & MDBX_TXN_RDONLY) == 0))
return LOG_IFERR(MDBX_EINVAL);
return LOG_IFERR(MDBX_BAD_TXN);
if (unlikely(src->flags & (MDBX_TXN_FINISHED | MDBX_TXN_ERROR | MDBX_TXN_PARKED)))
return LOG_IFERR(MDBX_BAD_TXN);
MDBX_env *const env = src->env;
rc = check_env(env, true);
int rc = check_env(env, true);
if (unlikely(rc != MDBX_SUCCESS))
return LOG_IFERR(rc);
if (unlikely((env->flags & MDBX_NOSTICKYTHREADS) == 0))
return LOG_IFERR(MDBX_TXN_OVERLAPPING);
if (unlikely(!env->lck_mmap.lck))
return LOG_IFERR(MDBX_EPERM);
const txnid_t snap_oldest = atomic_load64(&env->lck->cached_oldest, mo_AcquireRelease);
if (unlikely(src->txnid < snap_oldest))
return LOG_IFERR(MDBX_MVCC_RETARDED);
MDBX_txn *txn = nullptr;
const intptr_t bitmap_bytes =
#if MDBX_ENABLE_DBI_SPARSE
ceil_powerof2(env->max_dbi, CHAR_BIT * sizeof(txn->dbi_sparse[0])) / CHAR_BIT;
ceil_powerof2(env->max_dbi, CHAR_BIT * sizeof(src->dbi_sparse[0])) / CHAR_BIT;
#else
0;
#endif /* MDBX_ENABLE_DBI_SPARSE */
STATIC_ASSERT(sizeof(txn->tw) > sizeof(txn->to));
const size_t base = sizeof(MDBX_txn) - sizeof(txn->tw) + sizeof(txn->to);
const size_t size = base + (size_t)bitmap_bytes + env->max_dbi * sizeof(txn->dbi_seqs[0]) +
env->max_dbi * (sizeof(txn->dbs[0]) + sizeof(txn->cursors[0]) + sizeof(txn->dbi_state[0]));
const size_t base = sizeof(MDBX_txn) - sizeof(src->tw) + sizeof(src->to);
const size_t size = base + (size_t)bitmap_bytes + env->max_dbi * sizeof(src->dbi_seqs[0]) +
env->max_dbi * (sizeof(src->dbs[0]) + sizeof(src->cursors[0]) + sizeof(src->dbi_state[0]));
txn = osal_malloc(size);
if (unlikely(txn == nullptr))
MDBX_txn *clone = osal_malloc(size);
if (unlikely(clone == nullptr))
return LOG_IFERR(MDBX_ENOMEM);
#if MDBX_DEBUG
memset(txn, 0xCD, size);
VALGRIND_MAKE_MEM_UNDEFINED(txn, size);
memset(clone, 0xCD, size);
VALGRIND_MAKE_MEM_UNDEFINED(clone, size);
#endif /* MDBX_DEBUG */
MDBX_ANALYSIS_ASSUME(size > base);
memset(txn, 0, (MDBX_GOOFY_MSVC_STATIC_ANALYZER && base > size) ? size : base);
txn->dbs = ptr_disp(txn, base);
txn->cursors = ptr_disp(txn->dbs, env->max_dbi * sizeof(txn->dbs[0]));
#if MDBX_DEBUG
txn->cursors[FREE_DBI] = nullptr; /* avoid SIGSEGV in an assertion later */
#endif
txn->dbi_state = ptr_disp(txn, size - env->max_dbi * sizeof(txn->dbi_state[0]));
txn->dbi_seqs = ptr_disp(txn->cursors, env->max_dbi * sizeof(txn->cursors[0]));
memset(clone, 0, base);
clone->dbs = ptr_disp(clone, base);
clone->cursors = ptr_disp(clone->dbs, env->max_dbi * sizeof(clone->dbs[0]));
clone->dbi_state = ptr_disp(clone, size - env->max_dbi * sizeof(clone->dbi_state[0]));
clone->dbi_seqs = ptr_disp(clone->cursors, env->max_dbi * sizeof(clone->cursors[0]));
#if MDBX_ENABLE_DBI_SPARSE
txn->dbi_sparse = ptr_disp(txn->dbi_state, -bitmap_bytes);
clone->dbi_sparse = ptr_disp(clone->dbi_state, -bitmap_bytes);
#endif /* MDBX_ENABLE_DBI_SPARSE */
txn->env = env;
txn->flags = src->flags & ~txn_state_flags;
txn->parent = nullptr;
txn->nested = nullptr;
clone->flags = src->flags;
clone->env = env;
clone->parent = nullptr;
clone->nested = nullptr;
txn->txnid = src->txnid;
txn->front_txnid = src->front_txnid;
txn->geo = src->geo;
txn->canary = src->canary;
txn->owner = (env->flags & MDBX_NOSTICKYTHREADS) ? 0 : osal_thread_self();
if (unlikely(src->n_dbi > env->max_dbi)) {
rc = MDBX_CORRUPTED;
goto bailout;
bsr_t brs = mvcc_bind_slot(env);
if (unlikely(brs.err != MDBX_SUCCESS)) {
osal_free(clone);
return LOG_IFERR(brs.err);
}
clone->to.reader = brs.rslot;
txn->n_dbi = src->n_dbi;
memset(txn->cursors, 0, env->max_dbi * sizeof(txn->cursors[0]));
memset(txn->dbi_state, 0, env->max_dbi * sizeof(txn->dbi_state[0]));
memset(txn->dbi_seqs, 0, env->max_dbi * sizeof(txn->dbi_seqs[0]));
const uint32_t snapshot_pages_used = src->to.reader ? atomic_load32(&src->to.reader->snapshot_pages_used, mo_Relaxed)
: (uint32_t)src->geo.first_unallocated;
const uint64_t snapshot_pages_retired =
src->to.reader ? atomic_load64(&src->to.reader->snapshot_pages_retired, mo_Relaxed) : 0;
atomic_store32(&clone->to.reader->snapshot_pages_used, snapshot_pages_used, mo_Relaxed);
atomic_store64(&clone->to.reader->snapshot_pages_retired, snapshot_pages_retired, mo_Relaxed);
safe64_write(&clone->to.reader->txnid, src->txnid);
atomic_store32(&env->lck->rdt_refresh_flag, true, mo_AcquireRelease);
clone->txnid = src->txnid;
clone->front_txnid = src->front_txnid;
clone->geo = src->geo;
clone->canary = src->canary;
clone->owner = (env->flags & MDBX_NOSTICKYTHREADS) ? 0 : osal_thread_self();
const size_t n_dbi = src->n_dbi;
clone->n_dbi = n_dbi;
memcpy(clone->dbs, src->dbs, n_dbi * sizeof(clone->dbs[0]));
memcpy(clone->dbi_seqs, src->dbi_seqs, n_dbi * sizeof(clone->dbi_seqs[0]));
memcpy(clone->dbi_state, src->dbi_state, n_dbi * sizeof(clone->dbi_state[0]));
#if MDBX_ENABLE_DBI_SPARSE
if (bitmap_bytes)
memset(txn->dbi_sparse, 0, bitmap_bytes);
memcpy(clone->dbi_sparse, src->dbi_sparse, bitmap_bytes);
#endif /* MDBX_ENABLE_DBI_SPARSE */
memcpy(txn->dbs, src->dbs, txn->n_dbi * sizeof(txn->dbs[0]));
memcpy(txn->dbi_state, src->dbi_state, txn->n_dbi * sizeof(txn->dbi_state[0]));
memcpy(txn->dbi_seqs, src->dbi_seqs, txn->n_dbi * sizeof(txn->dbi_seqs[0]));
#if MDBX_ENABLE_DBI_SPARSE
if (bitmap_bytes)
memcpy(txn->dbi_sparse, src->dbi_sparse, bitmap_bytes);
#endif /* MDBX_ENABLE_DBI_SPARSE */
memset(clone->cursors, 0, env->max_dbi * sizeof(clone->cursors[0]));
clone->userctx = nullptr;
txn->to.reader = nullptr;
if (env->lck_mmap.lck) {
bsr_t brs = mvcc_bind_slot(env);
if (unlikely(brs.err != MDBX_SUCCESS)) {
rc = brs.err;
goto bailout;
}
txn->to.reader = brs.rslot;
safe64_reset(&txn->to.reader->txnid, true);
if (src->to.reader) {
atomic_store32(&txn->to.reader->snapshot_pages_used,
atomic_load32(&src->to.reader->snapshot_pages_used, mo_Relaxed), mo_Relaxed);
atomic_store64(&txn->to.reader->snapshot_pages_retired,
atomic_load64(&src->to.reader->snapshot_pages_retired, mo_Relaxed), mo_Relaxed);
} else {
atomic_store32(&txn->to.reader->snapshot_pages_used, src->geo.first_unallocated, mo_Relaxed);
atomic_store64(&txn->to.reader->snapshot_pages_retired, 0, mo_Relaxed);
}
safe64_write(&txn->to.reader->txnid, src->txnid);
atomic_store32(&env->lck->rdt_refresh_flag, true, mo_AcquireRelease);
#if defined(_WIN32) || defined(_WIN64)
const size_t used_bytes = pgno2bytes(env, clone->geo.first_unallocated);
if (((used_bytes > env->geo_in_bytes.lower && env->geo_in_bytes.shrink) ||
(globals.running_under_Wine && used_bytes < env->geo_in_bytes.upper && env->geo_in_bytes.grow)) &&
(clone->flags & MDBX_NOSTICKYTHREADS) == 0) {
clone->flags |= txn_shrink_allowed;
imports.srwl_AcquireShared(&env->remap_guard);
}
#endif /* Windows */
txn->signature = txn_signature;
txn->userctx = nullptr;
*dest = txn;
DEBUG("clone txn %" PRIaTXN "r %p from %p on env %p", txn->txnid, (void *)txn, (void *)src, (void *)env);
dxb_sanitize_tail(env, clone);
clone->signature = txn_signature;
DEBUG("clone txn %" PRIaTXN " %p from %p on env %p, root page %" PRIaPGNO "/%" PRIaPGNO, clone->txnid, (void *)clone,
(void *)src, (void *)env, clone->dbs[MAIN_DBI].root, clone->dbs[FREE_DBI].root);
*out = clone;
return MDBX_SUCCESS;
bailout:
osal_free(txn);
return LOG_IFERR(rc);
}
int mdbx_txn_set_userctx(MDBX_txn *txn, void *ctx) {

View File

@@ -3882,35 +3882,6 @@ MDBX_NOTHROW_PURE_FUNCTION LIBMDBX_API void *mdbx_env_get_userctx(const MDBX_env
LIBMDBX_API int mdbx_txn_begin_ex(MDBX_env *env, MDBX_txn *parent, MDBX_txn_flags_t flags, MDBX_txn **txn,
void *context);
/** \brief Clone a read-only transaction snapshot.
* \ingroup c_transactions
*
* Creates a new read-only transaction that uses the same MVCC snapshot as
* the \p src transaction. This allows parallel read operations across threads
* without re-opening a read transaction and re-fetching state.
*
* \note This function requires \ref MDBX_NOSTICKYTHREADS (aka MDBX_NOTLS)
* to be enabled for the environment. Otherwise it will return
* \ref MDBX_TXN_OVERLAPPING.
*
* \note The \p src transaction must be an active read-only transaction.
*
* \note The \p src transaction and the cloned transaction must not be used
* concurrently from multiple threads. Each transaction and its cursors must
* be confined to a single thread at a time.
*
* \param [in] src A read-only transaction handle returned by
* \ref mdbx_txn_begin_ex() or \ref mdbx_txn_begin().
* \param [out] dest Address where the cloned \ref MDBX_txn handle will be
* stored. Must not be NULL.
*
* \returns A non-zero error value on failure and 0 on success.
* \retval MDBX_EINVAL Invalid arguments or \p src is not read-only.
* \retval MDBX_TXN_OVERLAPPING \ref MDBX_NOSTICKYTHREADS is not enabled.
* \retval MDBX_READERS_FULL Reader lock table is full.
* \retval MDBX_ENOMEM Out of memory. */
LIBMDBX_API int mdbx_txn_clone(const MDBX_txn *src, MDBX_txn **dest);
/** \brief Create a transaction for use with the environment.
* \ingroup c_transactions
*
@@ -4432,6 +4403,49 @@ LIBMDBX_API int mdbx_txn_unpark(MDBX_txn *txn, bool restart_if_ousted);
* \retval MDBX_EINVAL Transaction handle is NULL. */
LIBMDBX_API int mdbx_txn_renew(MDBX_txn *txn);
/** \brief Clone a read-only transaction.
* \ingroup c_transactions
*
* This creates a new read-only transaction that uses the same MVCC snapshot
* as the source transaction. The cloned transaction can be used independently
* and concurrently with the source transaction (but each transaction must
* still only be used by one thread at a time).
*
* This is useful for parallelizing read operations across multiple threads
* while ensuring all threads see a consistent view of the database.
*
* \note Only read-only transactions can be cloned.
*
* \note The cloned transaction must be aborted or committed independently
* of the source transaction.
*
* \note The source transaction must remain valid (not aborted, reset, or
* committed) for the duration of this call.
*
* \note In sticky-thread mode (the default), the cloned transaction is bound
* to the thread that calls this function. In \ref MDBX_NOSTICKYTHREADS mode,
* the cloned transaction can be used from any thread.
*
* \param [in] src A read-only transaction handle to clone.
* \param [out] out Address where the new \ref MDBX_txn handle will be stored.
*
* \returns A non-zero error value on failure and 0 on success,
* some possible errors are:
* \retval MDBX_PANIC A fatal error occurred earlier and the
* environment must be shut down.
* \retval MDBX_BAD_TXN The source transaction is not read-only,
* has already finished, is parked, or in error.
* \retval MDBX_EBADSIGN Transaction object has invalid signature.
* \retval MDBX_MVCC_RETARDED The MVCC snapshot used by source transaction
* has been reclaimed by a writer.
* \retval MDBX_READERS_FULL The reader lock table is full.
* See \ref mdbx_env_set_maxreaders().
* \retval MDBX_EPERM Environment opened in exclusive mode
* (no reader table available).
* \retval MDBX_ENOMEM Out of memory.
* \retval MDBX_EINVAL An invalid parameter was specified. */
LIBMDBX_API int mdbx_txn_clone(const MDBX_txn *src, MDBX_txn **out);
/** \brief The fours integers markers (aka "canary") associated with the
* environment.
* \ingroup c_crud

View File

@@ -483,20 +483,6 @@ impl Transaction<RW> {
}
impl Transaction<RO> {
/// Clones this read-only transaction, preserving the same MVCC snapshot.
///
/// This requires the environment to be opened with `MDBX_NOSTICKYTHREADS` (aka `MDBX_NOTLS`).
/// The cloned transaction must not be used concurrently with this transaction from multiple
/// threads.
pub fn clone_snapshot(&self) -> Result<Self> {
let cloned = self.txn_execute(|txn| {
let mut cloned: *mut ffi::MDBX_txn = ptr::null_mut();
mdbx_result(unsafe { ffi::mdbx_txn_clone(txn, &mut cloned) }).map(|_| cloned)
})??;
Ok(Self::new_from_ptr(self.env().clone(), cloned))
}
/// Closes the database handle.
///
/// # Safety
@@ -507,6 +493,23 @@ impl Transaction<RO> {
Ok(())
}
/// Creates a new read-only transaction that uses the same MVCC snapshot as this transaction.
///
/// The cloned transaction can be used independently and concurrently with the source
/// transaction (but each transaction must still only be used by one thread at a time).
///
/// This is useful for parallelizing read operations across multiple threads while ensuring
/// all threads see a consistent view of the database.
pub fn clone_txn(&self) -> Result<Self> {
self.txn_execute(|txn| {
let mut out: *mut ffi::MDBX_txn = ptr::null_mut();
unsafe {
mdbx_result(ffi::mdbx_txn_clone(txn, &mut out))?;
Ok(Self::new_from_ptr(self.env().clone(), out))
}
})?
}
}
impl Transaction<RW> {

View File

@@ -375,33 +375,135 @@ fn test_stat_dupsort() {
}
#[test]
fn test_txn_clone_snapshot() {
fn test_clone_txn_same_snapshot() {
let dir = tempdir().unwrap();
let env = Environment::builder().open(dir.path()).unwrap();
{
let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap();
txn.put(db.dbi(), b"k", b"v1", WriteFlags::empty()).unwrap();
txn.commit().unwrap();
}
let ro = env.begin_ro_txn().unwrap();
let clone = ro.clone_snapshot().unwrap();
let txn = env.begin_rw_txn().unwrap();
let db = txn.create_db(None, DatabaseFlags::empty()).unwrap();
txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key2", b"val2", WriteFlags::empty()).unwrap();
txn.commit().unwrap();
{
let txn = env.begin_rw_txn().unwrap();
let db = txn.open_db(None).unwrap();
txn.put(db.dbi(), b"k", b"v2", WriteFlags::empty()).unwrap();
txn.commit().unwrap();
}
let txn1 = env.begin_ro_txn().unwrap();
let txn1_id = txn1.id().unwrap();
let db = ro.open_db(None).unwrap();
assert_eq!(ro.get::<[u8; 2]>(db.dbi(), b"k").unwrap(), Some(*b"v1"));
let txn2 = txn1.clone_txn().unwrap();
let txn2_id = txn2.id().unwrap();
let db = clone.open_db(None).unwrap();
assert_eq!(clone.get::<[u8; 2]>(db.dbi(), b"k").unwrap(), Some(*b"v1"));
assert_eq!(txn1_id, txn2_id, "cloned txn should have same txnid");
let ro2 = env.begin_ro_txn().unwrap();
let db = ro2.open_db(None).unwrap();
assert_eq!(ro2.get::<[u8; 2]>(db.dbi(), b"k").unwrap(), Some(*b"v2"));
let db1 = txn1.open_db(None).unwrap();
let db2 = txn2.open_db(None).unwrap();
assert_eq!(txn1.get::<[u8; 4]>(db1.dbi(), b"key1").unwrap(), Some(*b"val1"));
assert_eq!(txn2.get::<[u8; 4]>(db2.dbi(), b"key1").unwrap(), Some(*b"val1"));
assert_eq!(txn1.get::<[u8; 4]>(db1.dbi(), b"key2").unwrap(), Some(*b"val2"));
assert_eq!(txn2.get::<[u8; 4]>(db2.dbi(), b"key2").unwrap(), Some(*b"val2"));
}
#[test]
fn test_clone_txn_independent_lifetime() {
let dir = tempdir().unwrap();
let env = Environment::builder().open(dir.path()).unwrap();
let txn = env.begin_rw_txn().unwrap();
let db = txn.create_db(None, DatabaseFlags::empty()).unwrap();
txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.commit().unwrap();
let txn1 = env.begin_ro_txn().unwrap();
let txn2 = txn1.clone_txn().unwrap();
let txn2_id = txn2.id().unwrap();
drop(txn1);
let db2 = txn2.open_db(None).unwrap();
assert_eq!(txn2.get::<[u8; 4]>(db2.dbi(), b"key1").unwrap(), Some(*b"val1"));
assert_eq!(txn2.id().unwrap(), txn2_id);
}
#[test]
fn test_clone_txn_sees_same_snapshot_after_write() {
let dir = tempdir().unwrap();
let env = Environment::builder().open(dir.path()).unwrap();
let txn = env.begin_rw_txn().unwrap();
let db = txn.create_db(None, DatabaseFlags::empty()).unwrap();
txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.commit().unwrap();
let ro_txn = env.begin_ro_txn().unwrap();
let cloned_txn = ro_txn.clone_txn().unwrap();
{
let write_txn = env.begin_rw_txn().unwrap();
let db = write_txn.open_db(None).unwrap();
write_txn.put(db.dbi(), b"key2", b"val2", WriteFlags::empty()).unwrap();
write_txn.commit().unwrap();
}
let db1 = ro_txn.open_db(None).unwrap();
let db2 = cloned_txn.open_db(None).unwrap();
assert_eq!(ro_txn.get::<()>(db1.dbi(), b"key2").unwrap(), None);
assert_eq!(cloned_txn.get::<()>(db2.dbi(), b"key2").unwrap(), None);
let new_txn = env.begin_ro_txn().unwrap();
let new_db = new_txn.open_db(None).unwrap();
assert_eq!(new_txn.get::<[u8; 4]>(new_db.dbi(), b"key2").unwrap(), Some(*b"val2"));
}
#[test]
fn test_clone_txn_parallel_reads() {
let dir = tempdir().unwrap();
let env: Arc<Environment> = Arc::new(Environment::builder().open(dir.path()).unwrap());
let txn = env.begin_rw_txn().unwrap();
let db = txn.create_db(None, DatabaseFlags::empty()).unwrap();
for i in 0..100 {
txn.put(
db.dbi(),
format!("key{i:02}").as_bytes(),
format!("val{i:02}").as_bytes(),
WriteFlags::empty(),
)
.unwrap();
}
txn.commit().unwrap();
let base_txn = env.begin_ro_txn().unwrap();
let base_id = base_txn.id().unwrap();
let n = 4usize;
let barrier = Arc::new(Barrier::new(n));
let mut handles: Vec<JoinHandle<(u64, usize)>> = Vec::with_capacity(n);
for i in 0..n {
let cloned_txn = base_txn.clone_txn().unwrap();
let thread_barrier = barrier.clone();
handles.push(thread::spawn(move || {
thread_barrier.wait();
let txn_id = cloned_txn.id().unwrap();
let db = cloned_txn.open_db(None).unwrap();
let mut count = 0usize;
for j in (i * 25)..((i + 1) * 25) {
let val: Cow<'_, [u8]> =
cloned_txn.get(db.dbi(), format!("key{j:02}").as_bytes()).unwrap().unwrap();
assert!(val.starts_with(b"val"));
count += 1;
}
(txn_id, count)
}));
}
for handle in handles {
let (txn_id, count) = handle.join().unwrap();
assert_eq!(txn_id, base_id);
assert_eq!(count, 25);
}
}

View File

@@ -187,21 +187,6 @@ impl<'a> EitherWriter<'a, (), ()> {
}
impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
/// Extracts the raw `RocksDB` write batch from this writer, if it contains one.
///
/// Returns `Some(WriteBatchWithTransaction)` for [`Self::RocksDB`] variant,
/// `None` for other variants.
///
/// This is used to defer `RocksDB` commits to the provider level, ensuring all
/// storage commits (MDBX, static files, `RocksDB`) happen atomically in a single place.
#[cfg(all(unix, feature = "rocksdb"))]
pub fn into_raw_rocksdb_batch(self) -> Option<rocksdb::WriteBatchWithTransaction<true>> {
match self {
Self::Database(_) | Self::StaticFile(_) => None,
Self::RocksDB(batch) => Some(batch.into_inner()),
}
}
/// Increment the block number.
///
/// Relevant only for [`Self::StaticFile`]. It is a no-op for [`Self::Database`].
@@ -319,24 +304,13 @@ where
CURSOR: DbCursorRW<tables::TransactionHashNumbers> + DbCursorRO<tables::TransactionHashNumbers>,
{
/// Puts a transaction hash number mapping.
///
/// When `append_only` is true, uses `cursor.append()` which is significantly faster
/// but requires entries to be inserted in order and the table to be empty.
/// When false, uses `cursor.insert()` which handles arbitrary insertion order.
pub fn put_transaction_hash_number(
&mut self,
hash: TxHash,
tx_num: TxNumber,
append_only: bool,
) -> ProviderResult<()> {
match self {
Self::Database(cursor) => {
if append_only {
Ok(cursor.append(hash, &tx_num)?)
} else {
Ok(cursor.insert(hash, &tx_num)?)
}
}
Self::Database(cursor) => Ok(cursor.upsert(hash, &tx_num)?),
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.put::<tables::TransactionHashNumbers>(hash, &tx_num),
@@ -689,18 +663,12 @@ mod tests {
#[cfg(all(test, unix, feature = "rocksdb"))]
mod rocksdb_tests {
use super::*;
use crate::{
providers::rocksdb::{RocksDBBuilder, RocksDBProvider},
test_utils::create_test_provider_factory,
RocksDBProviderFactory,
};
use crate::providers::rocksdb::{RocksDBBuilder, RocksDBProvider};
use alloy_primitives::{Address, B256};
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, IntegerList, ShardedKey},
tables,
};
use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
use tempfile::TempDir;
fn create_rocksdb_provider() -> (TempDir, RocksDBProvider) {
@@ -714,87 +682,6 @@ mod rocksdb_tests {
(temp_dir, provider)
}
/// Test that `EitherWriter::new_transaction_hash_numbers` creates a `RocksDB` writer
/// when the storage setting is enabled, and that put operations followed by commit
/// persist the data to `RocksDB`.
#[test]
fn test_either_writer_transaction_hash_numbers_with_rocksdb() {
let factory = create_test_provider_factory();
// Enable RocksDB for transaction hash numbers
factory.set_storage_settings_cache(
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
);
let hash1 = B256::from([1u8; 32]);
let hash2 = B256::from([2u8; 32]);
let tx_num1 = 100u64;
let tx_num2 = 200u64;
// Get the RocksDB batch from the provider
let rocksdb = factory.rocksdb_provider();
let batch = rocksdb.batch();
// Create EitherWriter with RocksDB
let provider = factory.database_provider_rw().unwrap();
let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
// Verify we got a RocksDB writer
assert!(matches!(writer, EitherWriter::RocksDB(_)));
// Write transaction hash numbers (append_only=false since we're using RocksDB)
writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
// Extract the batch and register with provider for commit
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
}
// Commit via provider - this commits RocksDB batch too
provider.commit().unwrap();
// Verify data was written to RocksDB
let rocksdb = factory.rocksdb_provider();
assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(), Some(tx_num1));
assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(), Some(tx_num2));
}
/// Test that `EitherWriter::delete_transaction_hash_number` works with `RocksDB`.
#[test]
fn test_either_writer_delete_transaction_hash_number_with_rocksdb() {
let factory = create_test_provider_factory();
// Enable RocksDB for transaction hash numbers
factory.set_storage_settings_cache(
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
);
let hash = B256::from([1u8; 32]);
let tx_num = 100u64;
// First, write a value directly to RocksDB
let rocksdb = factory.rocksdb_provider();
rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_num).unwrap();
assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), Some(tx_num));
// Now delete using EitherWriter
let batch = rocksdb.batch();
let provider = factory.database_provider_rw().unwrap();
let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
writer.delete_transaction_hash_number(hash).unwrap();
// Extract the batch and commit via provider
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
}
provider.commit().unwrap();
// Verify deletion
let rocksdb = factory.rocksdb_provider();
assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), None);
}
#[test]
fn test_rocksdb_batch_transaction_hash_numbers() {
let (_temp_dir, provider) = create_rocksdb_provider();
@@ -929,65 +816,4 @@ mod rocksdb_tests {
// Verify deletion
assert_eq!(provider.get::<tables::AccountsHistory>(key).unwrap(), None);
}
/// Test that `RocksDB` commits happen at `provider.commit()` level, not at writer level.
///
/// This ensures all storage commits (MDBX, static files, `RocksDB`) happen atomically
/// in a single place, making it easier to reason about commit ordering and consistency.
#[test]
fn test_rocksdb_commits_at_provider_level() {
let factory = create_test_provider_factory();
// Enable RocksDB for transaction hash numbers
factory.set_storage_settings_cache(
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
);
let hash1 = B256::from([1u8; 32]);
let hash2 = B256::from([2u8; 32]);
let tx_num1 = 100u64;
let tx_num2 = 200u64;
// Get the RocksDB batch from the provider
let rocksdb = factory.rocksdb_provider();
let batch = rocksdb.batch();
// Create provider and EitherWriter
let provider = factory.database_provider_rw().unwrap();
let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
// Write transaction hash numbers (append_only=false since we're using RocksDB)
writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
// Extract the raw batch from the writer and register it with the provider
let raw_batch = writer.into_raw_rocksdb_batch();
if let Some(batch) = raw_batch {
provider.set_pending_rocksdb_batch(batch);
}
// Data should NOT be visible yet (batch not committed)
let rocksdb = factory.rocksdb_provider();
assert_eq!(
rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
None,
"Data should not be visible before provider.commit()"
);
// Commit the provider - this should commit both MDBX and RocksDB
provider.commit().unwrap();
// Now data should be visible in RocksDB
let rocksdb = factory.rocksdb_provider();
assert_eq!(
rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
Some(tx_num1),
"Data should be visible after provider.commit()"
);
assert_eq!(
rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(),
Some(tx_num2),
"Data should be visible after provider.commit()"
);
}
}

View File

@@ -181,11 +181,6 @@ impl<N: ProviderNodeTypes> RocksDBProviderFactory for BlockchainProvider<N> {
fn rocksdb_provider(&self) -> RocksDBProvider {
self.database.rocksdb_provider()
}
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
unimplemented!("BlockchainProvider wraps ProviderFactory - use DatabaseProvider::set_pending_rocksdb_batch instead")
}
}
impl<N: ProviderNodeTypes> HeaderProvider for BlockchainProvider<N> {

View File

@@ -153,11 +153,6 @@ impl<N: NodeTypesWithDB> RocksDBProviderFactory for ProviderFactory<N> {
fn rocksdb_provider(&self) -> RocksDBProvider {
self.rocksdb_provider.clone()
}
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::set_pending_rocksdb_batch instead")
}
}
impl<N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>> ProviderFactory<N> {

View File

@@ -151,6 +151,7 @@ impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
/// A provider struct that fetches data from the database.
/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
#[derive(Debug)]
pub struct DatabaseProvider<TX, N: NodeTypes> {
/// Database transaction.
tx: TX,
@@ -166,29 +167,10 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
storage_settings: Arc<RwLock<StorageSettings>>,
/// `RocksDB` provider
rocksdb_provider: RocksDBProvider,
/// Pending `RocksDB` batches to be committed at provider commit time.
#[cfg(all(unix, feature = "rocksdb"))]
pending_rocksdb_batches: parking_lot::Mutex<Vec<rocksdb::WriteBatchWithTransaction<true>>>,
/// Minimum distance from tip required for pruning
minimum_pruning_distance: u64,
}
impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut s = f.debug_struct("DatabaseProvider");
s.field("tx", &self.tx)
.field("chain_spec", &self.chain_spec)
.field("static_file_provider", &self.static_file_provider)
.field("prune_modes", &self.prune_modes)
.field("storage", &self.storage)
.field("storage_settings", &self.storage_settings)
.field("rocksdb_provider", &self.rocksdb_provider);
#[cfg(all(unix, feature = "rocksdb"))]
s.field("pending_rocksdb_batches", &"<pending batches>");
s.field("minimum_pruning_distance", &self.minimum_pruning_distance).finish()
}
}
impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
/// Returns reference to prune modes.
pub const fn prune_modes_ref(&self) -> &PruneModes {
@@ -277,11 +259,6 @@ impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
fn rocksdb_provider(&self) -> RocksDBProvider {
self.rocksdb_provider.clone()
}
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
self.pending_rocksdb_batches.lock().push(batch);
}
}
impl<TX: Debug + Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
@@ -313,8 +290,6 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
storage,
storage_settings,
rocksdb_provider,
#[cfg(all(unix, feature = "rocksdb"))]
pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
}
}
@@ -570,8 +545,6 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
storage,
storage_settings,
rocksdb_provider,
#[cfg(all(unix, feature = "rocksdb"))]
pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
}
}
@@ -3205,7 +3178,7 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
self.prune_modes_ref()
}
/// Commit database transaction, static files, and pending `RocksDB` batches.
/// Commit database transaction and static files.
fn commit(self) -> ProviderResult<bool> {
// For unwinding it makes more sense to commit the database first, since if
// it is interrupted before the static files commit, we can just
@@ -3213,27 +3186,9 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
// checkpoints on the next start-up.
if self.static_file_provider.has_unwind_queued() {
self.tx.commit()?;
#[cfg(all(unix, feature = "rocksdb"))]
{
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
}
self.static_file_provider.commit()?;
} else {
self.static_file_provider.commit()?;
#[cfg(all(unix, feature = "rocksdb"))]
{
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
}
self.tx.commit()?;
}

View File

@@ -450,19 +450,6 @@ impl RocksDBProvider {
batch_handle.commit()
})
}
/// Commits a raw `WriteBatchWithTransaction` to `RocksDB`.
///
/// This is used when the batch was extracted via [`RocksDBBatch::into_inner`]
/// and needs to be committed at a later point (e.g., at provider commit time).
pub fn commit_batch(&self, batch: WriteBatchWithTransaction<true>) -> ProviderResult<()> {
self.0.db.write_opt(batch, &WriteOptions::default()).map_err(|e| {
ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})
}
}
/// Handle for building a batch of operations atomically.
@@ -542,13 +529,6 @@ impl<'a> RocksDBBatch<'a> {
pub const fn provider(&self) -> &RocksDBProvider {
self.provider
}
/// Consumes the batch and returns the underlying `WriteBatchWithTransaction`.
///
/// This is used to defer commits to the provider level.
pub fn into_inner(self) -> WriteBatchWithTransaction<true> {
self.inner
}
}
/// `RocksDB` transaction wrapper providing MDBX-like semantics.

View File

@@ -1,5 +1,5 @@
use crate::{
providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBBuilder, StaticFileProvider},
providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
HashingWriter, ProviderFactory, TrieWriter,
};
use alloy_primitives::B256;
@@ -62,10 +62,7 @@ pub fn create_test_provider_factory_with_node_types<N: NodeTypesForProvider>(
db,
chain_spec,
StaticFileProvider::read_write(static_dir.keep()).expect("static file provider"),
RocksDBBuilder::new(&rocksdb_dir)
.with_default_tables()
.build()
.expect("failed to create test RocksDB provider"),
RocksDBProvider::new(&rocksdb_dir).expect("failed to create test RocksDB provider"),
)
.expect("failed to create test provider factory")
}

View File

@@ -29,9 +29,4 @@ impl<C: Send + Sync, N: NodePrimitives> RocksDBProviderFactory for NoopProvider<
fn rocksdb_provider(&self) -> RocksDBProvider {
RocksDBProvider::builder(PathBuf::default()).build().unwrap()
}
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
// No-op for NoopProvider
}
}

View File

@@ -6,11 +6,4 @@ use crate::providers::RocksDBProvider;
pub trait RocksDBProviderFactory {
/// Returns the `RocksDB` provider.
fn rocksdb_provider(&self) -> RocksDBProvider;
/// Adds a pending `RocksDB` batch to be committed when this provider is committed.
///
/// This allows deferring `RocksDB` commits to happen at the same time as MDBX and static file
/// commits, ensuring atomicity across all storage backends.
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>);
}