mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
1 Commits
tx-clone
...
dan/tx-clo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
299b7f1e74 |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -8247,7 +8247,6 @@ dependencies = [
|
||||
"metrics",
|
||||
"metrics-util",
|
||||
"mini-moka",
|
||||
"moka",
|
||||
"parking_lot",
|
||||
"proptest",
|
||||
"rand 0.8.5",
|
||||
@@ -10584,7 +10583,6 @@ dependencies = [
|
||||
"reth-stages-api",
|
||||
"reth-static-file",
|
||||
"reth-static-file-types",
|
||||
"reth-storage-api",
|
||||
"reth-storage-errors",
|
||||
"reth-testing-utils",
|
||||
"reth-trie",
|
||||
|
||||
@@ -587,7 +587,6 @@ url = { version = "2.3", default-features = false }
|
||||
zstd = "0.13"
|
||||
byteorder = "1"
|
||||
mini-moka = "0.10"
|
||||
moka = "0.12"
|
||||
tar-no-std = { version = "0.3.2", default-features = false }
|
||||
miniz_oxide = { version = "0.8.4", default-features = false }
|
||||
chrono = "0.4.41"
|
||||
|
||||
@@ -53,7 +53,6 @@ futures.workspace = true
|
||||
thiserror.workspace = true
|
||||
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "sync", "macros"] }
|
||||
mini-moka = { workspace = true, features = ["sync"] }
|
||||
moka = { workspace = true, features = ["sync"] }
|
||||
smallvec.workspace = true
|
||||
|
||||
# metrics
|
||||
|
||||
@@ -393,7 +393,7 @@ where
|
||||
metrics,
|
||||
terminate_execution,
|
||||
precompile_cache_disabled,
|
||||
precompile_cache_map,
|
||||
mut precompile_cache_map,
|
||||
} = self;
|
||||
|
||||
let mut state_provider = match provider.build() {
|
||||
|
||||
@@ -1,56 +1,50 @@
|
||||
//! Contains a precompile cache backed by `schnellru::LruMap` (LRU by length).
|
||||
|
||||
use alloy_primitives::Bytes;
|
||||
use dashmap::DashMap;
|
||||
use parking_lot::Mutex;
|
||||
use reth_evm::precompiles::{DynPrecompile, Precompile, PrecompileInput};
|
||||
use revm::precompile::{PrecompileId, PrecompileOutput, PrecompileResult};
|
||||
use revm_primitives::Address;
|
||||
use std::{hash::Hash, sync::Arc};
|
||||
use schnellru::LruMap;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
hash::{Hash, Hasher},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
/// Default max cache size for [`PrecompileCache`]
|
||||
const MAX_CACHE_SIZE: u32 = 10_000;
|
||||
|
||||
/// Stores caches for each precompile.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct PrecompileCacheMap<S>(Arc<DashMap<Address, PrecompileCache<S>>>)
|
||||
pub struct PrecompileCacheMap<S>(HashMap<Address, PrecompileCache<S>>)
|
||||
where
|
||||
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static;
|
||||
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone;
|
||||
|
||||
impl<S> PrecompileCacheMap<S>
|
||||
where
|
||||
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static,
|
||||
{
|
||||
pub(crate) fn cache_for_address(&self, address: Address) -> PrecompileCache<S> {
|
||||
// Try just using `.get` first to avoid acquiring a write lock.
|
||||
if let Some(cache) = self.0.get(&address) {
|
||||
return cache.clone();
|
||||
}
|
||||
// Otherwise, fallback to `.entry` and initialize the cache.
|
||||
//
|
||||
// This should be very rare as caches for all precompiles will be initialized as soon as
|
||||
// first EVM is created.
|
||||
pub(crate) fn cache_for_address(&mut self, address: Address) -> PrecompileCache<S> {
|
||||
self.0.entry(address).or_default().clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Cache for precompiles, for each input stores the result.
|
||||
///
|
||||
/// [`LruMap`] requires a mutable reference on `get` since it updates the LRU order,
|
||||
/// so we use a [`Mutex`] instead of an `RwLock`.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PrecompileCache<S>(
|
||||
moka::sync::Cache<Bytes, CacheEntry<S>, alloy_primitives::map::DefaultHashBuilder>,
|
||||
)
|
||||
pub struct PrecompileCache<S>(Arc<Mutex<LruMap<CacheKey<S>, CacheEntry>>>)
|
||||
where
|
||||
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static;
|
||||
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone;
|
||||
|
||||
impl<S> Default for PrecompileCache<S>
|
||||
where
|
||||
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static,
|
||||
{
|
||||
fn default() -> Self {
|
||||
Self(
|
||||
moka::sync::CacheBuilder::new(MAX_CACHE_SIZE as u64)
|
||||
.initial_capacity(MAX_CACHE_SIZE as usize)
|
||||
.build_with_hasher(Default::default()),
|
||||
)
|
||||
Self(Arc::new(Mutex::new(LruMap::new(schnellru::ByLength::new(MAX_CACHE_SIZE)))))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,31 +52,63 @@ impl<S> PrecompileCache<S>
|
||||
where
|
||||
S: Eq + Hash + std::fmt::Debug + Send + Sync + Clone + 'static,
|
||||
{
|
||||
fn get(&self, input: &[u8], spec: S) -> Option<CacheEntry<S>> {
|
||||
self.0.get(input).filter(|e| e.spec == spec)
|
||||
fn get(&self, key: &CacheKeyRef<'_, S>) -> Option<CacheEntry> {
|
||||
self.0.lock().get(key).cloned()
|
||||
}
|
||||
|
||||
/// Inserts the given key and value into the cache, returning the new cache size.
|
||||
fn insert(&self, input: Bytes, value: CacheEntry<S>) -> usize {
|
||||
self.0.insert(input, value);
|
||||
self.0.entry_count() as usize
|
||||
fn insert(&self, key: CacheKey<S>, value: CacheEntry) -> usize {
|
||||
let mut cache = self.0.lock();
|
||||
cache.insert(key, value);
|
||||
cache.len()
|
||||
}
|
||||
}
|
||||
|
||||
/// Cache key, spec id and precompile call input. spec id is included in the key to account for
|
||||
/// precompile repricing across fork activations.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct CacheKey<S>((S, Bytes));
|
||||
|
||||
impl<S> CacheKey<S> {
|
||||
const fn new(spec_id: S, input: Bytes) -> Self {
|
||||
Self((spec_id, input))
|
||||
}
|
||||
}
|
||||
|
||||
/// Cache key reference, used to avoid cloning the input bytes when looking up using a [`CacheKey`].
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct CacheKeyRef<'a, S>((S, &'a [u8]));
|
||||
|
||||
impl<'a, S> CacheKeyRef<'a, S> {
|
||||
const fn new(spec_id: S, input: &'a [u8]) -> Self {
|
||||
Self((spec_id, input))
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: PartialEq> PartialEq<CacheKey<S>> for CacheKeyRef<'_, S> {
|
||||
fn eq(&self, other: &CacheKey<S>) -> bool {
|
||||
self.0 .0 == other.0 .0 && self.0 .1 == other.0 .1.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, S: Hash> Hash for CacheKeyRef<'a, S> {
|
||||
fn hash<H: Hasher>(&self, state: &mut H) {
|
||||
self.0 .0.hash(state);
|
||||
self.0 .1.hash(state);
|
||||
}
|
||||
}
|
||||
|
||||
/// Cache entry, precompile successful output.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct CacheEntry<S> {
|
||||
output: PrecompileOutput,
|
||||
spec: S,
|
||||
}
|
||||
pub struct CacheEntry(PrecompileOutput);
|
||||
|
||||
impl<S> CacheEntry<S> {
|
||||
impl CacheEntry {
|
||||
const fn gas_used(&self) -> u64 {
|
||||
self.output.gas_used
|
||||
self.0.gas_used
|
||||
}
|
||||
|
||||
fn to_precompile_result(&self) -> PrecompileResult {
|
||||
Ok(self.output.clone())
|
||||
Ok(self.0.clone())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -164,7 +190,9 @@ where
|
||||
}
|
||||
|
||||
fn call(&self, input: PrecompileInput<'_>) -> PrecompileResult {
|
||||
if let Some(entry) = &self.cache.get(input.data, self.spec_id.clone()) {
|
||||
let key = CacheKeyRef::new(self.spec_id.clone(), input.data);
|
||||
|
||||
if let Some(entry) = &self.cache.get(&key) {
|
||||
self.increment_by_one_precompile_cache_hits();
|
||||
if input.gas >= entry.gas_used() {
|
||||
return entry.to_precompile_result()
|
||||
@@ -176,10 +204,8 @@ where
|
||||
|
||||
match &result {
|
||||
Ok(output) => {
|
||||
let size = self.cache.insert(
|
||||
Bytes::copy_from_slice(calldata),
|
||||
CacheEntry { output: output.clone(), spec: self.spec_id.clone() },
|
||||
);
|
||||
let key = CacheKey::new(self.spec_id.clone(), Bytes::copy_from_slice(calldata));
|
||||
let size = self.cache.insert(key, CacheEntry(output.clone()));
|
||||
self.set_precompile_cache_size_metric(size as f64);
|
||||
self.increment_by_one_precompile_cache_misses();
|
||||
}
|
||||
@@ -220,12 +246,31 @@ impl CachedPrecompileMetrics {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::hash::DefaultHasher;
|
||||
|
||||
use super::*;
|
||||
use reth_evm::{EthEvmFactory, Evm, EvmEnv, EvmFactory};
|
||||
use reth_revm::db::EmptyDB;
|
||||
use revm::{context::TxEnv, precompile::PrecompileOutput};
|
||||
use revm_primitives::hardfork::SpecId;
|
||||
|
||||
#[test]
|
||||
fn test_cache_key_ref_hash() {
|
||||
let key1 = CacheKey::new(SpecId::PRAGUE, b"test_input".into());
|
||||
let key2 = CacheKeyRef::new(SpecId::PRAGUE, b"test_input");
|
||||
assert!(PartialEq::eq(&key2, &key1));
|
||||
|
||||
let mut hasher = DefaultHasher::new();
|
||||
key1.hash(&mut hasher);
|
||||
let hash1 = hasher.finish();
|
||||
|
||||
let mut hasher = DefaultHasher::new();
|
||||
key2.hash(&mut hasher);
|
||||
let hash2 = hasher.finish();
|
||||
|
||||
assert_eq!(hash1, hash2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_precompile_cache_basic() {
|
||||
let dyn_precompile: DynPrecompile = (|_input: PrecompileInput<'_>| -> PrecompileResult {
|
||||
@@ -248,11 +293,12 @@ mod tests {
|
||||
reverted: false,
|
||||
};
|
||||
|
||||
let input = b"test_input";
|
||||
let expected = CacheEntry { output, spec: SpecId::PRAGUE };
|
||||
cache.cache.insert(input.into(), expected.clone());
|
||||
let key = CacheKey::new(SpecId::PRAGUE, b"test_input".into());
|
||||
let expected = CacheEntry(output);
|
||||
cache.cache.insert(key, expected.clone());
|
||||
|
||||
let actual = cache.cache.get(input, SpecId::PRAGUE).unwrap();
|
||||
let key = CacheKeyRef::new(SpecId::PRAGUE, b"test_input");
|
||||
let actual = cache.cache.get(&key).unwrap();
|
||||
|
||||
assert_eq!(actual, expected);
|
||||
}
|
||||
@@ -266,7 +312,7 @@ mod tests {
|
||||
let address1 = Address::repeat_byte(1);
|
||||
let address2 = Address::repeat_byte(2);
|
||||
|
||||
let cache_map = PrecompileCacheMap::default();
|
||||
let mut cache_map = PrecompileCacheMap::default();
|
||||
|
||||
// create the first precompile with a specific output
|
||||
let precompile1: DynPrecompile = (PrecompileId::custom("custom"), {
|
||||
|
||||
@@ -75,7 +75,6 @@ reth-network-p2p = { workspace = true, features = ["test-utils"] }
|
||||
reth-downloaders.workspace = true
|
||||
reth-static-file.workspace = true
|
||||
reth-stages-api = { workspace = true, features = ["test-utils"] }
|
||||
reth-storage-api.workspace = true
|
||||
reth-testing-utils.workspace = true
|
||||
reth-trie = { workspace = true, features = ["test-utils"] }
|
||||
reth-provider = { workspace = true, features = ["test-utils"] }
|
||||
@@ -117,7 +116,6 @@ test-utils = [
|
||||
"reth-ethereum-primitives?/test-utils",
|
||||
"reth-evm-ethereum/test-utils",
|
||||
]
|
||||
rocksdb = ["reth-provider/rocksdb"]
|
||||
|
||||
[[bench]]
|
||||
name = "criterion"
|
||||
|
||||
@@ -3,16 +3,17 @@ use alloy_primitives::{TxHash, TxNumber};
|
||||
use num_traits::Zero;
|
||||
use reth_config::config::{EtlConfig, TransactionLookupConfig};
|
||||
use reth_db_api::{
|
||||
table::{Decode, Decompress, Value},
|
||||
cursor::{DbCursorRO, DbCursorRW},
|
||||
table::Value,
|
||||
tables,
|
||||
transaction::DbTxMut,
|
||||
RawKey, RawValue,
|
||||
};
|
||||
use reth_etl::Collector;
|
||||
use reth_primitives_traits::{NodePrimitives, SignedTransaction};
|
||||
use reth_provider::{
|
||||
BlockReader, DBProvider, EitherWriter, PruneCheckpointReader, PruneCheckpointWriter,
|
||||
RocksDBProviderFactory, StaticFileProviderFactory, StatsReader, StorageSettingsCache,
|
||||
TransactionsProvider, TransactionsProviderExt,
|
||||
BlockReader, DBProvider, PruneCheckpointReader, PruneCheckpointWriter,
|
||||
StaticFileProviderFactory, StatsReader, TransactionsProvider, TransactionsProviderExt,
|
||||
};
|
||||
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
|
||||
use reth_stages_api::{
|
||||
@@ -64,9 +65,7 @@ where
|
||||
+ PruneCheckpointReader
|
||||
+ StatsReader
|
||||
+ StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value + SignedTransaction>>
|
||||
+ TransactionsProviderExt
|
||||
+ StorageSettingsCache
|
||||
+ RocksDBProviderFactory,
|
||||
+ TransactionsProviderExt,
|
||||
{
|
||||
/// Return the id of the stage
|
||||
fn id(&self) -> StageId {
|
||||
@@ -151,27 +150,16 @@ where
|
||||
);
|
||||
|
||||
if range_output.is_final_range {
|
||||
let total_hashes = hash_collector.len();
|
||||
let interval = (total_hashes / 10).max(1);
|
||||
|
||||
// Use append mode when table is empty (first sync) - significantly faster
|
||||
let append_only =
|
||||
provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
|
||||
let mut txhash_cursor = provider
|
||||
.tx_ref()
|
||||
.cursor_write::<tables::RawTable<tables::TransactionHashNumbers>>()?;
|
||||
|
||||
// Create RocksDB batch if feature is enabled
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb = provider.rocksdb_provider();
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb_batch = rocksdb.batch();
|
||||
#[cfg(not(all(unix, feature = "rocksdb")))]
|
||||
let rocksdb_batch = ();
|
||||
|
||||
// Create writer that routes to either MDBX or RocksDB based on settings
|
||||
let mut writer =
|
||||
EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
|
||||
|
||||
let total_hashes = hash_collector.len();
|
||||
let interval = (total_hashes / 10).max(1);
|
||||
for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
|
||||
let (hash_bytes, number_bytes) = hash_to_number?;
|
||||
let (hash, number) = hash_to_number?;
|
||||
if index > 0 && index.is_multiple_of(interval) {
|
||||
info!(
|
||||
target: "sync::stages::transaction_lookup",
|
||||
@@ -181,16 +169,12 @@ where
|
||||
);
|
||||
}
|
||||
|
||||
// Decode from raw ETL bytes
|
||||
let hash = TxHash::decode(&hash_bytes)?;
|
||||
let tx_num = TxNumber::decompress(&number_bytes)?;
|
||||
writer.put_transaction_hash_number(hash, tx_num, append_only)?;
|
||||
}
|
||||
|
||||
// Extract and register RocksDB batch for commit at provider level
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if let Some(batch) = writer.into_raw_rocksdb_batch() {
|
||||
provider.set_pending_rocksdb_batch(batch);
|
||||
let key = RawKey::<TxHash>::from_vec(hash);
|
||||
if append_only {
|
||||
txhash_cursor.append(key, &RawValue::<TxNumber>::from_vec(number))?
|
||||
} else {
|
||||
txhash_cursor.insert(key, &RawValue::<TxNumber>::from_vec(number))?
|
||||
}
|
||||
}
|
||||
|
||||
trace!(target: "sync::stages::transaction_lookup",
|
||||
@@ -215,19 +199,11 @@ where
|
||||
provider: &Provider,
|
||||
input: UnwindInput,
|
||||
) -> Result<UnwindOutput, StageError> {
|
||||
let tx = provider.tx_ref();
|
||||
let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);
|
||||
|
||||
// Create RocksDB batch if feature is enabled
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb = provider.rocksdb_provider();
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb_batch = rocksdb.batch();
|
||||
#[cfg(not(all(unix, feature = "rocksdb")))]
|
||||
let rocksdb_batch = ();
|
||||
|
||||
// Create writer that routes to either MDBX or RocksDB based on settings
|
||||
let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
|
||||
|
||||
// Cursor to unwind tx hash to number
|
||||
let mut tx_hash_number_cursor = tx.cursor_write::<tables::TransactionHashNumbers>()?;
|
||||
let static_file_provider = provider.static_file_provider();
|
||||
let rev_walker = provider
|
||||
.block_body_indices_range(range.clone())?
|
||||
@@ -242,18 +218,15 @@ where
|
||||
|
||||
// Delete all transactions that belong to this block
|
||||
for tx_id in body.tx_num_range() {
|
||||
if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? {
|
||||
writer.delete_transaction_hash_number(transaction.trie_hash())?;
|
||||
// First delete the transaction and hash to id mapping
|
||||
if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? &&
|
||||
tx_hash_number_cursor.seek_exact(transaction.trie_hash())?.is_some()
|
||||
{
|
||||
tx_hash_number_cursor.delete_current()?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Extract and register RocksDB batch for commit at provider level
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if let Some(batch) = writer.into_raw_rocksdb_batch() {
|
||||
provider.set_pending_rocksdb_batch(batch);
|
||||
}
|
||||
|
||||
Ok(UnwindOutput {
|
||||
checkpoint: StageCheckpoint::new(unwind_to)
|
||||
.with_entities_stage_checkpoint(stage_checkpoint(provider)?),
|
||||
@@ -293,7 +266,7 @@ mod tests {
|
||||
};
|
||||
use alloy_primitives::{BlockNumber, B256};
|
||||
use assert_matches::assert_matches;
|
||||
use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
|
||||
use reth_db_api::transaction::DbTx;
|
||||
use reth_ethereum_primitives::Block;
|
||||
use reth_primitives_traits::SealedBlock;
|
||||
use reth_provider::{
|
||||
@@ -608,160 +581,4 @@ mod tests {
|
||||
self.ensure_no_hash_by_block(input.unwind_to)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
mod rocksdb_tests {
|
||||
use super::*;
|
||||
use reth_provider::RocksDBProviderFactory;
|
||||
use reth_storage_api::StorageSettings;
|
||||
|
||||
/// Test that when `transaction_hash_numbers_in_rocksdb` is enabled, the stage
|
||||
/// writes transaction hash mappings to `RocksDB` instead of MDBX.
|
||||
#[tokio::test]
|
||||
async fn execute_writes_to_rocksdb_when_enabled() {
|
||||
let (previous_stage, stage_progress) = (110, 100);
|
||||
let mut rng = generators::rng();
|
||||
|
||||
// Set up the runner
|
||||
let runner = TransactionLookupTestRunner::default();
|
||||
|
||||
// Enable RocksDB for transaction hash numbers
|
||||
runner.db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
|
||||
);
|
||||
|
||||
let input = ExecInput {
|
||||
target: Some(previous_stage),
|
||||
checkpoint: Some(StageCheckpoint::new(stage_progress)),
|
||||
};
|
||||
|
||||
// Insert blocks with transactions
|
||||
let blocks = random_block_range(
|
||||
&mut rng,
|
||||
stage_progress + 1..=previous_stage,
|
||||
BlockRangeParams {
|
||||
parent: Some(B256::ZERO),
|
||||
tx_count: 1..3, // Ensure we have transactions
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
runner
|
||||
.db
|
||||
.insert_blocks(blocks.iter(), StorageKind::Static)
|
||||
.expect("failed to insert blocks");
|
||||
|
||||
// Count expected transactions
|
||||
let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum();
|
||||
assert!(expected_tx_count > 0, "test requires at least one transaction");
|
||||
|
||||
// Execute the stage
|
||||
let rx = runner.execute(input);
|
||||
let result = rx.await.unwrap();
|
||||
assert!(result.is_ok(), "stage execution failed: {:?}", result);
|
||||
|
||||
// Verify MDBX table is empty (data should be in RocksDB)
|
||||
let mdbx_count = runner.db.count_entries::<tables::TransactionHashNumbers>().unwrap();
|
||||
assert_eq!(
|
||||
mdbx_count, 0,
|
||||
"MDBX TransactionHashNumbers should be empty when RocksDB is enabled"
|
||||
);
|
||||
|
||||
// Verify RocksDB has the data
|
||||
let rocksdb = runner.db.factory.rocksdb_provider();
|
||||
let mut rocksdb_count = 0;
|
||||
for block in &blocks {
|
||||
for tx in &block.body().transactions {
|
||||
let hash = *tx.tx_hash();
|
||||
let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
|
||||
assert!(result.is_some(), "Transaction hash {:?} not found in RocksDB", hash);
|
||||
rocksdb_count += 1;
|
||||
}
|
||||
}
|
||||
assert_eq!(
|
||||
rocksdb_count, expected_tx_count,
|
||||
"RocksDB should contain all transaction hashes"
|
||||
);
|
||||
}
|
||||
|
||||
/// Test that when `transaction_hash_numbers_in_rocksdb` is enabled, the stage
|
||||
/// unwind deletes transaction hash mappings from `RocksDB` instead of MDBX.
|
||||
#[tokio::test]
|
||||
async fn unwind_deletes_from_rocksdb_when_enabled() {
|
||||
let (previous_stage, stage_progress) = (110, 100);
|
||||
let mut rng = generators::rng();
|
||||
|
||||
// Set up the runner
|
||||
let runner = TransactionLookupTestRunner::default();
|
||||
|
||||
// Enable RocksDB for transaction hash numbers
|
||||
runner.db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// Insert blocks with transactions
|
||||
let blocks = random_block_range(
|
||||
&mut rng,
|
||||
stage_progress + 1..=previous_stage,
|
||||
BlockRangeParams {
|
||||
parent: Some(B256::ZERO),
|
||||
tx_count: 1..3, // Ensure we have transactions
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
runner
|
||||
.db
|
||||
.insert_blocks(blocks.iter(), StorageKind::Static)
|
||||
.expect("failed to insert blocks");
|
||||
|
||||
// Count expected transactions
|
||||
let expected_tx_count: usize = blocks.iter().map(|b| b.body().transactions.len()).sum();
|
||||
assert!(expected_tx_count > 0, "test requires at least one transaction");
|
||||
|
||||
// Execute the stage first to populate RocksDB
|
||||
let exec_input = ExecInput {
|
||||
target: Some(previous_stage),
|
||||
checkpoint: Some(StageCheckpoint::new(stage_progress)),
|
||||
};
|
||||
let rx = runner.execute(exec_input);
|
||||
let result = rx.await.unwrap();
|
||||
assert!(result.is_ok(), "stage execution failed: {:?}", result);
|
||||
|
||||
// Verify RocksDB has the data before unwind
|
||||
let rocksdb = runner.db.factory.rocksdb_provider();
|
||||
for block in &blocks {
|
||||
for tx in &block.body().transactions {
|
||||
let hash = *tx.tx_hash();
|
||||
let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
|
||||
assert!(
|
||||
result.is_some(),
|
||||
"Transaction hash {:?} should exist before unwind",
|
||||
hash
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Now unwind to stage_progress (removing all the blocks we added)
|
||||
let unwind_input = UnwindInput {
|
||||
checkpoint: StageCheckpoint::new(previous_stage),
|
||||
unwind_to: stage_progress,
|
||||
bad_block: None,
|
||||
};
|
||||
let unwind_result = runner.unwind(unwind_input).await;
|
||||
assert!(unwind_result.is_ok(), "stage unwind failed: {:?}", unwind_result);
|
||||
|
||||
// Verify RocksDB data is deleted after unwind
|
||||
let rocksdb = runner.db.factory.rocksdb_provider();
|
||||
for block in &blocks {
|
||||
for tx in &block.body().transactions {
|
||||
let hash = *tx.tx_hash();
|
||||
let result = rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap();
|
||||
assert!(
|
||||
result.is_none(),
|
||||
"Transaction hash {:?} should be deleted from RocksDB after unwind",
|
||||
hash
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,7 +50,7 @@ impl Default for TestStageDB {
|
||||
create_test_rw_db(),
|
||||
MAINNET.clone(),
|
||||
StaticFileProvider::read_write(static_dir_path).unwrap(),
|
||||
RocksDBProvider::builder(rocksdb_dir_path).with_default_tables().build().unwrap(),
|
||||
RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
|
||||
)
|
||||
.expect("failed to create test provider factory"),
|
||||
}
|
||||
@@ -68,7 +68,7 @@ impl TestStageDB {
|
||||
create_test_rw_db_with_path(path),
|
||||
MAINNET.clone(),
|
||||
StaticFileProvider::read_write(static_dir_path).unwrap(),
|
||||
RocksDBProvider::builder(rocksdb_dir_path).with_default_tables().build().unwrap(),
|
||||
RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
|
||||
)
|
||||
.expect("failed to create test provider factory"),
|
||||
}
|
||||
|
||||
167
crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c
vendored
167
crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c
vendored
@@ -12936,123 +12936,122 @@ int mdbx_txn_renew(MDBX_txn *txn) {
|
||||
return LOG_IFERR(rc);
|
||||
}
|
||||
|
||||
int mdbx_txn_clone(const MDBX_txn *src, MDBX_txn **dest) {
|
||||
if (unlikely(!dest))
|
||||
int mdbx_txn_clone(const MDBX_txn *src, MDBX_txn **out) {
|
||||
if (unlikely(!out))
|
||||
return LOG_IFERR(MDBX_EINVAL);
|
||||
*dest = nullptr;
|
||||
*out = nullptr;
|
||||
|
||||
int rc = check_txn(src, MDBX_TXN_BLOCKED);
|
||||
if (unlikely(rc != MDBX_SUCCESS))
|
||||
return LOG_IFERR(rc);
|
||||
if (unlikely(!src))
|
||||
return LOG_IFERR(MDBX_EINVAL);
|
||||
|
||||
if (unlikely(src->signature != txn_signature))
|
||||
return LOG_IFERR(MDBX_EBADSIGN);
|
||||
|
||||
if (unlikely((src->flags & MDBX_TXN_RDONLY) == 0))
|
||||
return LOG_IFERR(MDBX_EINVAL);
|
||||
return LOG_IFERR(MDBX_BAD_TXN);
|
||||
|
||||
if (unlikely(src->flags & (MDBX_TXN_FINISHED | MDBX_TXN_ERROR | MDBX_TXN_PARKED)))
|
||||
return LOG_IFERR(MDBX_BAD_TXN);
|
||||
|
||||
MDBX_env *const env = src->env;
|
||||
rc = check_env(env, true);
|
||||
int rc = check_env(env, true);
|
||||
if (unlikely(rc != MDBX_SUCCESS))
|
||||
return LOG_IFERR(rc);
|
||||
|
||||
if (unlikely((env->flags & MDBX_NOSTICKYTHREADS) == 0))
|
||||
return LOG_IFERR(MDBX_TXN_OVERLAPPING);
|
||||
if (unlikely(!env->lck_mmap.lck))
|
||||
return LOG_IFERR(MDBX_EPERM);
|
||||
|
||||
const txnid_t snap_oldest = atomic_load64(&env->lck->cached_oldest, mo_AcquireRelease);
|
||||
if (unlikely(src->txnid < snap_oldest))
|
||||
return LOG_IFERR(MDBX_MVCC_RETARDED);
|
||||
|
||||
MDBX_txn *txn = nullptr;
|
||||
const intptr_t bitmap_bytes =
|
||||
#if MDBX_ENABLE_DBI_SPARSE
|
||||
ceil_powerof2(env->max_dbi, CHAR_BIT * sizeof(txn->dbi_sparse[0])) / CHAR_BIT;
|
||||
ceil_powerof2(env->max_dbi, CHAR_BIT * sizeof(src->dbi_sparse[0])) / CHAR_BIT;
|
||||
#else
|
||||
0;
|
||||
#endif /* MDBX_ENABLE_DBI_SPARSE */
|
||||
|
||||
STATIC_ASSERT(sizeof(txn->tw) > sizeof(txn->to));
|
||||
const size_t base = sizeof(MDBX_txn) - sizeof(txn->tw) + sizeof(txn->to);
|
||||
const size_t size = base + (size_t)bitmap_bytes + env->max_dbi * sizeof(txn->dbi_seqs[0]) +
|
||||
env->max_dbi * (sizeof(txn->dbs[0]) + sizeof(txn->cursors[0]) + sizeof(txn->dbi_state[0]));
|
||||
const size_t base = sizeof(MDBX_txn) - sizeof(src->tw) + sizeof(src->to);
|
||||
const size_t size = base + (size_t)bitmap_bytes + env->max_dbi * sizeof(src->dbi_seqs[0]) +
|
||||
env->max_dbi * (sizeof(src->dbs[0]) + sizeof(src->cursors[0]) + sizeof(src->dbi_state[0]));
|
||||
|
||||
txn = osal_malloc(size);
|
||||
if (unlikely(txn == nullptr))
|
||||
MDBX_txn *clone = osal_malloc(size);
|
||||
if (unlikely(clone == nullptr))
|
||||
return LOG_IFERR(MDBX_ENOMEM);
|
||||
|
||||
#if MDBX_DEBUG
|
||||
memset(txn, 0xCD, size);
|
||||
VALGRIND_MAKE_MEM_UNDEFINED(txn, size);
|
||||
memset(clone, 0xCD, size);
|
||||
VALGRIND_MAKE_MEM_UNDEFINED(clone, size);
|
||||
#endif /* MDBX_DEBUG */
|
||||
MDBX_ANALYSIS_ASSUME(size > base);
|
||||
memset(txn, 0, (MDBX_GOOFY_MSVC_STATIC_ANALYZER && base > size) ? size : base);
|
||||
txn->dbs = ptr_disp(txn, base);
|
||||
txn->cursors = ptr_disp(txn->dbs, env->max_dbi * sizeof(txn->dbs[0]));
|
||||
#if MDBX_DEBUG
|
||||
txn->cursors[FREE_DBI] = nullptr; /* avoid SIGSEGV in an assertion later */
|
||||
#endif
|
||||
txn->dbi_state = ptr_disp(txn, size - env->max_dbi * sizeof(txn->dbi_state[0]));
|
||||
txn->dbi_seqs = ptr_disp(txn->cursors, env->max_dbi * sizeof(txn->cursors[0]));
|
||||
memset(clone, 0, base);
|
||||
|
||||
clone->dbs = ptr_disp(clone, base);
|
||||
clone->cursors = ptr_disp(clone->dbs, env->max_dbi * sizeof(clone->dbs[0]));
|
||||
clone->dbi_state = ptr_disp(clone, size - env->max_dbi * sizeof(clone->dbi_state[0]));
|
||||
clone->dbi_seqs = ptr_disp(clone->cursors, env->max_dbi * sizeof(clone->cursors[0]));
|
||||
#if MDBX_ENABLE_DBI_SPARSE
|
||||
txn->dbi_sparse = ptr_disp(txn->dbi_state, -bitmap_bytes);
|
||||
clone->dbi_sparse = ptr_disp(clone->dbi_state, -bitmap_bytes);
|
||||
#endif /* MDBX_ENABLE_DBI_SPARSE */
|
||||
|
||||
txn->env = env;
|
||||
txn->flags = src->flags & ~txn_state_flags;
|
||||
txn->parent = nullptr;
|
||||
txn->nested = nullptr;
|
||||
clone->flags = src->flags;
|
||||
clone->env = env;
|
||||
clone->parent = nullptr;
|
||||
clone->nested = nullptr;
|
||||
|
||||
txn->txnid = src->txnid;
|
||||
txn->front_txnid = src->front_txnid;
|
||||
txn->geo = src->geo;
|
||||
txn->canary = src->canary;
|
||||
txn->owner = (env->flags & MDBX_NOSTICKYTHREADS) ? 0 : osal_thread_self();
|
||||
|
||||
if (unlikely(src->n_dbi > env->max_dbi)) {
|
||||
rc = MDBX_CORRUPTED;
|
||||
goto bailout;
|
||||
bsr_t brs = mvcc_bind_slot(env);
|
||||
if (unlikely(brs.err != MDBX_SUCCESS)) {
|
||||
osal_free(clone);
|
||||
return LOG_IFERR(brs.err);
|
||||
}
|
||||
clone->to.reader = brs.rslot;
|
||||
|
||||
txn->n_dbi = src->n_dbi;
|
||||
memset(txn->cursors, 0, env->max_dbi * sizeof(txn->cursors[0]));
|
||||
memset(txn->dbi_state, 0, env->max_dbi * sizeof(txn->dbi_state[0]));
|
||||
memset(txn->dbi_seqs, 0, env->max_dbi * sizeof(txn->dbi_seqs[0]));
|
||||
const uint32_t snapshot_pages_used = src->to.reader ? atomic_load32(&src->to.reader->snapshot_pages_used, mo_Relaxed)
|
||||
: (uint32_t)src->geo.first_unallocated;
|
||||
const uint64_t snapshot_pages_retired =
|
||||
src->to.reader ? atomic_load64(&src->to.reader->snapshot_pages_retired, mo_Relaxed) : 0;
|
||||
|
||||
atomic_store32(&clone->to.reader->snapshot_pages_used, snapshot_pages_used, mo_Relaxed);
|
||||
atomic_store64(&clone->to.reader->snapshot_pages_retired, snapshot_pages_retired, mo_Relaxed);
|
||||
safe64_write(&clone->to.reader->txnid, src->txnid);
|
||||
atomic_store32(&env->lck->rdt_refresh_flag, true, mo_AcquireRelease);
|
||||
|
||||
clone->txnid = src->txnid;
|
||||
clone->front_txnid = src->front_txnid;
|
||||
clone->geo = src->geo;
|
||||
clone->canary = src->canary;
|
||||
clone->owner = (env->flags & MDBX_NOSTICKYTHREADS) ? 0 : osal_thread_self();
|
||||
|
||||
const size_t n_dbi = src->n_dbi;
|
||||
clone->n_dbi = n_dbi;
|
||||
memcpy(clone->dbs, src->dbs, n_dbi * sizeof(clone->dbs[0]));
|
||||
memcpy(clone->dbi_seqs, src->dbi_seqs, n_dbi * sizeof(clone->dbi_seqs[0]));
|
||||
memcpy(clone->dbi_state, src->dbi_state, n_dbi * sizeof(clone->dbi_state[0]));
|
||||
#if MDBX_ENABLE_DBI_SPARSE
|
||||
if (bitmap_bytes)
|
||||
memset(txn->dbi_sparse, 0, bitmap_bytes);
|
||||
memcpy(clone->dbi_sparse, src->dbi_sparse, bitmap_bytes);
|
||||
#endif /* MDBX_ENABLE_DBI_SPARSE */
|
||||
|
||||
memcpy(txn->dbs, src->dbs, txn->n_dbi * sizeof(txn->dbs[0]));
|
||||
memcpy(txn->dbi_state, src->dbi_state, txn->n_dbi * sizeof(txn->dbi_state[0]));
|
||||
memcpy(txn->dbi_seqs, src->dbi_seqs, txn->n_dbi * sizeof(txn->dbi_seqs[0]));
|
||||
#if MDBX_ENABLE_DBI_SPARSE
|
||||
if (bitmap_bytes)
|
||||
memcpy(txn->dbi_sparse, src->dbi_sparse, bitmap_bytes);
|
||||
#endif /* MDBX_ENABLE_DBI_SPARSE */
|
||||
memset(clone->cursors, 0, env->max_dbi * sizeof(clone->cursors[0]));
|
||||
clone->userctx = nullptr;
|
||||
|
||||
txn->to.reader = nullptr;
|
||||
if (env->lck_mmap.lck) {
|
||||
bsr_t brs = mvcc_bind_slot(env);
|
||||
if (unlikely(brs.err != MDBX_SUCCESS)) {
|
||||
rc = brs.err;
|
||||
goto bailout;
|
||||
}
|
||||
txn->to.reader = brs.rslot;
|
||||
safe64_reset(&txn->to.reader->txnid, true);
|
||||
if (src->to.reader) {
|
||||
atomic_store32(&txn->to.reader->snapshot_pages_used,
|
||||
atomic_load32(&src->to.reader->snapshot_pages_used, mo_Relaxed), mo_Relaxed);
|
||||
atomic_store64(&txn->to.reader->snapshot_pages_retired,
|
||||
atomic_load64(&src->to.reader->snapshot_pages_retired, mo_Relaxed), mo_Relaxed);
|
||||
} else {
|
||||
atomic_store32(&txn->to.reader->snapshot_pages_used, src->geo.first_unallocated, mo_Relaxed);
|
||||
atomic_store64(&txn->to.reader->snapshot_pages_retired, 0, mo_Relaxed);
|
||||
}
|
||||
safe64_write(&txn->to.reader->txnid, src->txnid);
|
||||
atomic_store32(&env->lck->rdt_refresh_flag, true, mo_AcquireRelease);
|
||||
#if defined(_WIN32) || defined(_WIN64)
|
||||
const size_t used_bytes = pgno2bytes(env, clone->geo.first_unallocated);
|
||||
if (((used_bytes > env->geo_in_bytes.lower && env->geo_in_bytes.shrink) ||
|
||||
(globals.running_under_Wine && used_bytes < env->geo_in_bytes.upper && env->geo_in_bytes.grow)) &&
|
||||
(clone->flags & MDBX_NOSTICKYTHREADS) == 0) {
|
||||
clone->flags |= txn_shrink_allowed;
|
||||
imports.srwl_AcquireShared(&env->remap_guard);
|
||||
}
|
||||
#endif /* Windows */
|
||||
|
||||
txn->signature = txn_signature;
|
||||
txn->userctx = nullptr;
|
||||
*dest = txn;
|
||||
DEBUG("clone txn %" PRIaTXN "r %p from %p on env %p", txn->txnid, (void *)txn, (void *)src, (void *)env);
|
||||
dxb_sanitize_tail(env, clone);
|
||||
clone->signature = txn_signature;
|
||||
|
||||
DEBUG("clone txn %" PRIaTXN " %p from %p on env %p, root page %" PRIaPGNO "/%" PRIaPGNO, clone->txnid, (void *)clone,
|
||||
(void *)src, (void *)env, clone->dbs[MAIN_DBI].root, clone->dbs[FREE_DBI].root);
|
||||
|
||||
*out = clone;
|
||||
return MDBX_SUCCESS;
|
||||
|
||||
bailout:
|
||||
osal_free(txn);
|
||||
return LOG_IFERR(rc);
|
||||
}
|
||||
|
||||
int mdbx_txn_set_userctx(MDBX_txn *txn, void *ctx) {
|
||||
|
||||
@@ -3882,35 +3882,6 @@ MDBX_NOTHROW_PURE_FUNCTION LIBMDBX_API void *mdbx_env_get_userctx(const MDBX_env
|
||||
LIBMDBX_API int mdbx_txn_begin_ex(MDBX_env *env, MDBX_txn *parent, MDBX_txn_flags_t flags, MDBX_txn **txn,
|
||||
void *context);
|
||||
|
||||
/** \brief Clone a read-only transaction snapshot.
|
||||
* \ingroup c_transactions
|
||||
*
|
||||
* Creates a new read-only transaction that uses the same MVCC snapshot as
|
||||
* the \p src transaction. This allows parallel read operations across threads
|
||||
* without re-opening a read transaction and re-fetching state.
|
||||
*
|
||||
* \note This function requires \ref MDBX_NOSTICKYTHREADS (aka MDBX_NOTLS)
|
||||
* to be enabled for the environment. Otherwise it will return
|
||||
* \ref MDBX_TXN_OVERLAPPING.
|
||||
*
|
||||
* \note The \p src transaction must be an active read-only transaction.
|
||||
*
|
||||
* \note The \p src transaction and the cloned transaction must not be used
|
||||
* concurrently from multiple threads. Each transaction and its cursors must
|
||||
* be confined to a single thread at a time.
|
||||
*
|
||||
* \param [in] src A read-only transaction handle returned by
|
||||
* \ref mdbx_txn_begin_ex() or \ref mdbx_txn_begin().
|
||||
* \param [out] dest Address where the cloned \ref MDBX_txn handle will be
|
||||
* stored. Must not be NULL.
|
||||
*
|
||||
* \returns A non-zero error value on failure and 0 on success.
|
||||
* \retval MDBX_EINVAL Invalid arguments or \p src is not read-only.
|
||||
* \retval MDBX_TXN_OVERLAPPING \ref MDBX_NOSTICKYTHREADS is not enabled.
|
||||
* \retval MDBX_READERS_FULL Reader lock table is full.
|
||||
* \retval MDBX_ENOMEM Out of memory. */
|
||||
LIBMDBX_API int mdbx_txn_clone(const MDBX_txn *src, MDBX_txn **dest);
|
||||
|
||||
/** \brief Create a transaction for use with the environment.
|
||||
* \ingroup c_transactions
|
||||
*
|
||||
@@ -4432,6 +4403,49 @@ LIBMDBX_API int mdbx_txn_unpark(MDBX_txn *txn, bool restart_if_ousted);
|
||||
* \retval MDBX_EINVAL Transaction handle is NULL. */
|
||||
LIBMDBX_API int mdbx_txn_renew(MDBX_txn *txn);
|
||||
|
||||
/** \brief Clone a read-only transaction.
|
||||
* \ingroup c_transactions
|
||||
*
|
||||
* This creates a new read-only transaction that uses the same MVCC snapshot
|
||||
* as the source transaction. The cloned transaction can be used independently
|
||||
* and concurrently with the source transaction (but each transaction must
|
||||
* still only be used by one thread at a time).
|
||||
*
|
||||
* This is useful for parallelizing read operations across multiple threads
|
||||
* while ensuring all threads see a consistent view of the database.
|
||||
*
|
||||
* \note Only read-only transactions can be cloned.
|
||||
*
|
||||
* \note The cloned transaction must be aborted or committed independently
|
||||
* of the source transaction.
|
||||
*
|
||||
* \note The source transaction must remain valid (not aborted, reset, or
|
||||
* committed) for the duration of this call.
|
||||
*
|
||||
* \note In sticky-thread mode (the default), the cloned transaction is bound
|
||||
* to the thread that calls this function. In \ref MDBX_NOSTICKYTHREADS mode,
|
||||
* the cloned transaction can be used from any thread.
|
||||
*
|
||||
* \param [in] src A read-only transaction handle to clone.
|
||||
* \param [out] out Address where the new \ref MDBX_txn handle will be stored.
|
||||
*
|
||||
* \returns A non-zero error value on failure and 0 on success,
|
||||
* some possible errors are:
|
||||
* \retval MDBX_PANIC A fatal error occurred earlier and the
|
||||
* environment must be shut down.
|
||||
* \retval MDBX_BAD_TXN The source transaction is not read-only,
|
||||
* has already finished, is parked, or in error.
|
||||
* \retval MDBX_EBADSIGN Transaction object has invalid signature.
|
||||
* \retval MDBX_MVCC_RETARDED The MVCC snapshot used by source transaction
|
||||
* has been reclaimed by a writer.
|
||||
* \retval MDBX_READERS_FULL The reader lock table is full.
|
||||
* See \ref mdbx_env_set_maxreaders().
|
||||
* \retval MDBX_EPERM Environment opened in exclusive mode
|
||||
* (no reader table available).
|
||||
* \retval MDBX_ENOMEM Out of memory.
|
||||
* \retval MDBX_EINVAL An invalid parameter was specified. */
|
||||
LIBMDBX_API int mdbx_txn_clone(const MDBX_txn *src, MDBX_txn **out);
|
||||
|
||||
/** \brief The fours integers markers (aka "canary") associated with the
|
||||
* environment.
|
||||
* \ingroup c_crud
|
||||
|
||||
@@ -483,20 +483,6 @@ impl Transaction<RW> {
|
||||
}
|
||||
|
||||
impl Transaction<RO> {
|
||||
/// Clones this read-only transaction, preserving the same MVCC snapshot.
|
||||
///
|
||||
/// This requires the environment to be opened with `MDBX_NOSTICKYTHREADS` (aka `MDBX_NOTLS`).
|
||||
/// The cloned transaction must not be used concurrently with this transaction from multiple
|
||||
/// threads.
|
||||
pub fn clone_snapshot(&self) -> Result<Self> {
|
||||
let cloned = self.txn_execute(|txn| {
|
||||
let mut cloned: *mut ffi::MDBX_txn = ptr::null_mut();
|
||||
mdbx_result(unsafe { ffi::mdbx_txn_clone(txn, &mut cloned) }).map(|_| cloned)
|
||||
})??;
|
||||
|
||||
Ok(Self::new_from_ptr(self.env().clone(), cloned))
|
||||
}
|
||||
|
||||
/// Closes the database handle.
|
||||
///
|
||||
/// # Safety
|
||||
@@ -507,6 +493,23 @@ impl Transaction<RO> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Creates a new read-only transaction that uses the same MVCC snapshot as this transaction.
|
||||
///
|
||||
/// The cloned transaction can be used independently and concurrently with the source
|
||||
/// transaction (but each transaction must still only be used by one thread at a time).
|
||||
///
|
||||
/// This is useful for parallelizing read operations across multiple threads while ensuring
|
||||
/// all threads see a consistent view of the database.
|
||||
pub fn clone_txn(&self) -> Result<Self> {
|
||||
self.txn_execute(|txn| {
|
||||
let mut out: *mut ffi::MDBX_txn = ptr::null_mut();
|
||||
unsafe {
|
||||
mdbx_result(ffi::mdbx_txn_clone(txn, &mut out))?;
|
||||
Ok(Self::new_from_ptr(self.env().clone(), out))
|
||||
}
|
||||
})?
|
||||
}
|
||||
}
|
||||
|
||||
impl Transaction<RW> {
|
||||
|
||||
@@ -375,33 +375,135 @@ fn test_stat_dupsort() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_txn_clone_snapshot() {
|
||||
fn test_clone_txn_same_snapshot() {
|
||||
let dir = tempdir().unwrap();
|
||||
let env = Environment::builder().open(dir.path()).unwrap();
|
||||
{
|
||||
let txn = env.begin_rw_txn().unwrap();
|
||||
let db = txn.open_db(None).unwrap();
|
||||
txn.put(db.dbi(), b"k", b"v1", WriteFlags::empty()).unwrap();
|
||||
txn.commit().unwrap();
|
||||
}
|
||||
|
||||
let ro = env.begin_ro_txn().unwrap();
|
||||
let clone = ro.clone_snapshot().unwrap();
|
||||
let txn = env.begin_rw_txn().unwrap();
|
||||
let db = txn.create_db(None, DatabaseFlags::empty()).unwrap();
|
||||
txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap();
|
||||
txn.put(db.dbi(), b"key2", b"val2", WriteFlags::empty()).unwrap();
|
||||
txn.commit().unwrap();
|
||||
|
||||
{
|
||||
let txn = env.begin_rw_txn().unwrap();
|
||||
let db = txn.open_db(None).unwrap();
|
||||
txn.put(db.dbi(), b"k", b"v2", WriteFlags::empty()).unwrap();
|
||||
txn.commit().unwrap();
|
||||
}
|
||||
let txn1 = env.begin_ro_txn().unwrap();
|
||||
let txn1_id = txn1.id().unwrap();
|
||||
|
||||
let db = ro.open_db(None).unwrap();
|
||||
assert_eq!(ro.get::<[u8; 2]>(db.dbi(), b"k").unwrap(), Some(*b"v1"));
|
||||
let txn2 = txn1.clone_txn().unwrap();
|
||||
let txn2_id = txn2.id().unwrap();
|
||||
|
||||
let db = clone.open_db(None).unwrap();
|
||||
assert_eq!(clone.get::<[u8; 2]>(db.dbi(), b"k").unwrap(), Some(*b"v1"));
|
||||
assert_eq!(txn1_id, txn2_id, "cloned txn should have same txnid");
|
||||
|
||||
let ro2 = env.begin_ro_txn().unwrap();
|
||||
let db = ro2.open_db(None).unwrap();
|
||||
assert_eq!(ro2.get::<[u8; 2]>(db.dbi(), b"k").unwrap(), Some(*b"v2"));
|
||||
let db1 = txn1.open_db(None).unwrap();
|
||||
let db2 = txn2.open_db(None).unwrap();
|
||||
|
||||
assert_eq!(txn1.get::<[u8; 4]>(db1.dbi(), b"key1").unwrap(), Some(*b"val1"));
|
||||
assert_eq!(txn2.get::<[u8; 4]>(db2.dbi(), b"key1").unwrap(), Some(*b"val1"));
|
||||
assert_eq!(txn1.get::<[u8; 4]>(db1.dbi(), b"key2").unwrap(), Some(*b"val2"));
|
||||
assert_eq!(txn2.get::<[u8; 4]>(db2.dbi(), b"key2").unwrap(), Some(*b"val2"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_clone_txn_independent_lifetime() {
|
||||
let dir = tempdir().unwrap();
|
||||
let env = Environment::builder().open(dir.path()).unwrap();
|
||||
|
||||
let txn = env.begin_rw_txn().unwrap();
|
||||
let db = txn.create_db(None, DatabaseFlags::empty()).unwrap();
|
||||
txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap();
|
||||
txn.commit().unwrap();
|
||||
|
||||
let txn1 = env.begin_ro_txn().unwrap();
|
||||
let txn2 = txn1.clone_txn().unwrap();
|
||||
let txn2_id = txn2.id().unwrap();
|
||||
|
||||
drop(txn1);
|
||||
|
||||
let db2 = txn2.open_db(None).unwrap();
|
||||
assert_eq!(txn2.get::<[u8; 4]>(db2.dbi(), b"key1").unwrap(), Some(*b"val1"));
|
||||
assert_eq!(txn2.id().unwrap(), txn2_id);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_clone_txn_sees_same_snapshot_after_write() {
|
||||
let dir = tempdir().unwrap();
|
||||
let env = Environment::builder().open(dir.path()).unwrap();
|
||||
|
||||
let txn = env.begin_rw_txn().unwrap();
|
||||
let db = txn.create_db(None, DatabaseFlags::empty()).unwrap();
|
||||
txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap();
|
||||
txn.commit().unwrap();
|
||||
|
||||
let ro_txn = env.begin_ro_txn().unwrap();
|
||||
let cloned_txn = ro_txn.clone_txn().unwrap();
|
||||
|
||||
{
|
||||
let write_txn = env.begin_rw_txn().unwrap();
|
||||
let db = write_txn.open_db(None).unwrap();
|
||||
write_txn.put(db.dbi(), b"key2", b"val2", WriteFlags::empty()).unwrap();
|
||||
write_txn.commit().unwrap();
|
||||
}
|
||||
|
||||
let db1 = ro_txn.open_db(None).unwrap();
|
||||
let db2 = cloned_txn.open_db(None).unwrap();
|
||||
|
||||
assert_eq!(ro_txn.get::<()>(db1.dbi(), b"key2").unwrap(), None);
|
||||
assert_eq!(cloned_txn.get::<()>(db2.dbi(), b"key2").unwrap(), None);
|
||||
|
||||
let new_txn = env.begin_ro_txn().unwrap();
|
||||
let new_db = new_txn.open_db(None).unwrap();
|
||||
assert_eq!(new_txn.get::<[u8; 4]>(new_db.dbi(), b"key2").unwrap(), Some(*b"val2"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_clone_txn_parallel_reads() {
|
||||
let dir = tempdir().unwrap();
|
||||
let env: Arc<Environment> = Arc::new(Environment::builder().open(dir.path()).unwrap());
|
||||
|
||||
let txn = env.begin_rw_txn().unwrap();
|
||||
let db = txn.create_db(None, DatabaseFlags::empty()).unwrap();
|
||||
for i in 0..100 {
|
||||
txn.put(
|
||||
db.dbi(),
|
||||
format!("key{i:02}").as_bytes(),
|
||||
format!("val{i:02}").as_bytes(),
|
||||
WriteFlags::empty(),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
txn.commit().unwrap();
|
||||
|
||||
let base_txn = env.begin_ro_txn().unwrap();
|
||||
let base_id = base_txn.id().unwrap();
|
||||
|
||||
let n = 4usize;
|
||||
let barrier = Arc::new(Barrier::new(n));
|
||||
let mut handles: Vec<JoinHandle<(u64, usize)>> = Vec::with_capacity(n);
|
||||
|
||||
for i in 0..n {
|
||||
let cloned_txn = base_txn.clone_txn().unwrap();
|
||||
let thread_barrier = barrier.clone();
|
||||
|
||||
handles.push(thread::spawn(move || {
|
||||
thread_barrier.wait();
|
||||
|
||||
let txn_id = cloned_txn.id().unwrap();
|
||||
let db = cloned_txn.open_db(None).unwrap();
|
||||
let mut count = 0usize;
|
||||
|
||||
for j in (i * 25)..((i + 1) * 25) {
|
||||
let val: Cow<'_, [u8]> =
|
||||
cloned_txn.get(db.dbi(), format!("key{j:02}").as_bytes()).unwrap().unwrap();
|
||||
assert!(val.starts_with(b"val"));
|
||||
count += 1;
|
||||
}
|
||||
|
||||
(txn_id, count)
|
||||
}));
|
||||
}
|
||||
|
||||
for handle in handles {
|
||||
let (txn_id, count) = handle.join().unwrap();
|
||||
assert_eq!(txn_id, base_id);
|
||||
assert_eq!(count, 25);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -187,21 +187,6 @@ impl<'a> EitherWriter<'a, (), ()> {
|
||||
}
|
||||
|
||||
impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
|
||||
/// Extracts the raw `RocksDB` write batch from this writer, if it contains one.
|
||||
///
|
||||
/// Returns `Some(WriteBatchWithTransaction)` for [`Self::RocksDB`] variant,
|
||||
/// `None` for other variants.
|
||||
///
|
||||
/// This is used to defer `RocksDB` commits to the provider level, ensuring all
|
||||
/// storage commits (MDBX, static files, `RocksDB`) happen atomically in a single place.
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
pub fn into_raw_rocksdb_batch(self) -> Option<rocksdb::WriteBatchWithTransaction<true>> {
|
||||
match self {
|
||||
Self::Database(_) | Self::StaticFile(_) => None,
|
||||
Self::RocksDB(batch) => Some(batch.into_inner()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Increment the block number.
|
||||
///
|
||||
/// Relevant only for [`Self::StaticFile`]. It is a no-op for [`Self::Database`].
|
||||
@@ -319,24 +304,13 @@ where
|
||||
CURSOR: DbCursorRW<tables::TransactionHashNumbers> + DbCursorRO<tables::TransactionHashNumbers>,
|
||||
{
|
||||
/// Puts a transaction hash number mapping.
|
||||
///
|
||||
/// When `append_only` is true, uses `cursor.append()` which is significantly faster
|
||||
/// but requires entries to be inserted in order and the table to be empty.
|
||||
/// When false, uses `cursor.insert()` which handles arbitrary insertion order.
|
||||
pub fn put_transaction_hash_number(
|
||||
&mut self,
|
||||
hash: TxHash,
|
||||
tx_num: TxNumber,
|
||||
append_only: bool,
|
||||
) -> ProviderResult<()> {
|
||||
match self {
|
||||
Self::Database(cursor) => {
|
||||
if append_only {
|
||||
Ok(cursor.append(hash, &tx_num)?)
|
||||
} else {
|
||||
Ok(cursor.insert(hash, &tx_num)?)
|
||||
}
|
||||
}
|
||||
Self::Database(cursor) => Ok(cursor.upsert(hash, &tx_num)?),
|
||||
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(batch) => batch.put::<tables::TransactionHashNumbers>(hash, &tx_num),
|
||||
@@ -689,18 +663,12 @@ mod tests {
|
||||
|
||||
#[cfg(all(test, unix, feature = "rocksdb"))]
|
||||
mod rocksdb_tests {
|
||||
use super::*;
|
||||
use crate::{
|
||||
providers::rocksdb::{RocksDBBuilder, RocksDBProvider},
|
||||
test_utils::create_test_provider_factory,
|
||||
RocksDBProviderFactory,
|
||||
};
|
||||
use crate::providers::rocksdb::{RocksDBBuilder, RocksDBProvider};
|
||||
use alloy_primitives::{Address, B256};
|
||||
use reth_db_api::{
|
||||
models::{storage_sharded_key::StorageShardedKey, IntegerList, ShardedKey},
|
||||
tables,
|
||||
};
|
||||
use reth_storage_api::{DatabaseProviderFactory, StorageSettings};
|
||||
use tempfile::TempDir;
|
||||
|
||||
fn create_rocksdb_provider() -> (TempDir, RocksDBProvider) {
|
||||
@@ -714,87 +682,6 @@ mod rocksdb_tests {
|
||||
(temp_dir, provider)
|
||||
}
|
||||
|
||||
/// Test that `EitherWriter::new_transaction_hash_numbers` creates a `RocksDB` writer
|
||||
/// when the storage setting is enabled, and that put operations followed by commit
|
||||
/// persist the data to `RocksDB`.
|
||||
#[test]
|
||||
fn test_either_writer_transaction_hash_numbers_with_rocksdb() {
|
||||
let factory = create_test_provider_factory();
|
||||
|
||||
// Enable RocksDB for transaction hash numbers
|
||||
factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
|
||||
);
|
||||
|
||||
let hash1 = B256::from([1u8; 32]);
|
||||
let hash2 = B256::from([2u8; 32]);
|
||||
let tx_num1 = 100u64;
|
||||
let tx_num2 = 200u64;
|
||||
|
||||
// Get the RocksDB batch from the provider
|
||||
let rocksdb = factory.rocksdb_provider();
|
||||
let batch = rocksdb.batch();
|
||||
|
||||
// Create EitherWriter with RocksDB
|
||||
let provider = factory.database_provider_rw().unwrap();
|
||||
let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
|
||||
|
||||
// Verify we got a RocksDB writer
|
||||
assert!(matches!(writer, EitherWriter::RocksDB(_)));
|
||||
|
||||
// Write transaction hash numbers (append_only=false since we're using RocksDB)
|
||||
writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
|
||||
writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
|
||||
|
||||
// Extract the batch and register with provider for commit
|
||||
if let Some(batch) = writer.into_raw_rocksdb_batch() {
|
||||
provider.set_pending_rocksdb_batch(batch);
|
||||
}
|
||||
|
||||
// Commit via provider - this commits RocksDB batch too
|
||||
provider.commit().unwrap();
|
||||
|
||||
// Verify data was written to RocksDB
|
||||
let rocksdb = factory.rocksdb_provider();
|
||||
assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(), Some(tx_num1));
|
||||
assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(), Some(tx_num2));
|
||||
}
|
||||
|
||||
/// Test that `EitherWriter::delete_transaction_hash_number` works with `RocksDB`.
|
||||
#[test]
|
||||
fn test_either_writer_delete_transaction_hash_number_with_rocksdb() {
|
||||
let factory = create_test_provider_factory();
|
||||
|
||||
// Enable RocksDB for transaction hash numbers
|
||||
factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
|
||||
);
|
||||
|
||||
let hash = B256::from([1u8; 32]);
|
||||
let tx_num = 100u64;
|
||||
|
||||
// First, write a value directly to RocksDB
|
||||
let rocksdb = factory.rocksdb_provider();
|
||||
rocksdb.put::<tables::TransactionHashNumbers>(hash, &tx_num).unwrap();
|
||||
assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), Some(tx_num));
|
||||
|
||||
// Now delete using EitherWriter
|
||||
let batch = rocksdb.batch();
|
||||
let provider = factory.database_provider_rw().unwrap();
|
||||
let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
|
||||
writer.delete_transaction_hash_number(hash).unwrap();
|
||||
|
||||
// Extract the batch and commit via provider
|
||||
if let Some(batch) = writer.into_raw_rocksdb_batch() {
|
||||
provider.set_pending_rocksdb_batch(batch);
|
||||
}
|
||||
provider.commit().unwrap();
|
||||
|
||||
// Verify deletion
|
||||
let rocksdb = factory.rocksdb_provider();
|
||||
assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rocksdb_batch_transaction_hash_numbers() {
|
||||
let (_temp_dir, provider) = create_rocksdb_provider();
|
||||
@@ -929,65 +816,4 @@ mod rocksdb_tests {
|
||||
// Verify deletion
|
||||
assert_eq!(provider.get::<tables::AccountsHistory>(key).unwrap(), None);
|
||||
}
|
||||
|
||||
/// Test that `RocksDB` commits happen at `provider.commit()` level, not at writer level.
|
||||
///
|
||||
/// This ensures all storage commits (MDBX, static files, `RocksDB`) happen atomically
|
||||
/// in a single place, making it easier to reason about commit ordering and consistency.
|
||||
#[test]
|
||||
fn test_rocksdb_commits_at_provider_level() {
|
||||
let factory = create_test_provider_factory();
|
||||
|
||||
// Enable RocksDB for transaction hash numbers
|
||||
factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_transaction_hash_numbers_in_rocksdb(true),
|
||||
);
|
||||
|
||||
let hash1 = B256::from([1u8; 32]);
|
||||
let hash2 = B256::from([2u8; 32]);
|
||||
let tx_num1 = 100u64;
|
||||
let tx_num2 = 200u64;
|
||||
|
||||
// Get the RocksDB batch from the provider
|
||||
let rocksdb = factory.rocksdb_provider();
|
||||
let batch = rocksdb.batch();
|
||||
|
||||
// Create provider and EitherWriter
|
||||
let provider = factory.database_provider_rw().unwrap();
|
||||
let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
|
||||
|
||||
// Write transaction hash numbers (append_only=false since we're using RocksDB)
|
||||
writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
|
||||
writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
|
||||
|
||||
// Extract the raw batch from the writer and register it with the provider
|
||||
let raw_batch = writer.into_raw_rocksdb_batch();
|
||||
if let Some(batch) = raw_batch {
|
||||
provider.set_pending_rocksdb_batch(batch);
|
||||
}
|
||||
|
||||
// Data should NOT be visible yet (batch not committed)
|
||||
let rocksdb = factory.rocksdb_provider();
|
||||
assert_eq!(
|
||||
rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
|
||||
None,
|
||||
"Data should not be visible before provider.commit()"
|
||||
);
|
||||
|
||||
// Commit the provider - this should commit both MDBX and RocksDB
|
||||
provider.commit().unwrap();
|
||||
|
||||
// Now data should be visible in RocksDB
|
||||
let rocksdb = factory.rocksdb_provider();
|
||||
assert_eq!(
|
||||
rocksdb.get::<tables::TransactionHashNumbers>(hash1).unwrap(),
|
||||
Some(tx_num1),
|
||||
"Data should be visible after provider.commit()"
|
||||
);
|
||||
assert_eq!(
|
||||
rocksdb.get::<tables::TransactionHashNumbers>(hash2).unwrap(),
|
||||
Some(tx_num2),
|
||||
"Data should be visible after provider.commit()"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -181,11 +181,6 @@ impl<N: ProviderNodeTypes> RocksDBProviderFactory for BlockchainProvider<N> {
|
||||
fn rocksdb_provider(&self) -> RocksDBProvider {
|
||||
self.database.rocksdb_provider()
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
|
||||
unimplemented!("BlockchainProvider wraps ProviderFactory - use DatabaseProvider::set_pending_rocksdb_batch instead")
|
||||
}
|
||||
}
|
||||
|
||||
impl<N: ProviderNodeTypes> HeaderProvider for BlockchainProvider<N> {
|
||||
|
||||
@@ -153,11 +153,6 @@ impl<N: NodeTypesWithDB> RocksDBProviderFactory for ProviderFactory<N> {
|
||||
fn rocksdb_provider(&self) -> RocksDBProvider {
|
||||
self.rocksdb_provider.clone()
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
|
||||
unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::set_pending_rocksdb_batch instead")
|
||||
}
|
||||
}
|
||||
|
||||
impl<N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>> ProviderFactory<N> {
|
||||
|
||||
@@ -151,6 +151,7 @@ impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
|
||||
|
||||
/// A provider struct that fetches data from the database.
|
||||
/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
|
||||
#[derive(Debug)]
|
||||
pub struct DatabaseProvider<TX, N: NodeTypes> {
|
||||
/// Database transaction.
|
||||
tx: TX,
|
||||
@@ -166,29 +167,10 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
|
||||
storage_settings: Arc<RwLock<StorageSettings>>,
|
||||
/// `RocksDB` provider
|
||||
rocksdb_provider: RocksDBProvider,
|
||||
/// Pending `RocksDB` batches to be committed at provider commit time.
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
pending_rocksdb_batches: parking_lot::Mutex<Vec<rocksdb::WriteBatchWithTransaction<true>>>,
|
||||
/// Minimum distance from tip required for pruning
|
||||
minimum_pruning_distance: u64,
|
||||
}
|
||||
|
||||
impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let mut s = f.debug_struct("DatabaseProvider");
|
||||
s.field("tx", &self.tx)
|
||||
.field("chain_spec", &self.chain_spec)
|
||||
.field("static_file_provider", &self.static_file_provider)
|
||||
.field("prune_modes", &self.prune_modes)
|
||||
.field("storage", &self.storage)
|
||||
.field("storage_settings", &self.storage_settings)
|
||||
.field("rocksdb_provider", &self.rocksdb_provider);
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
s.field("pending_rocksdb_batches", &"<pending batches>");
|
||||
s.field("minimum_pruning_distance", &self.minimum_pruning_distance).finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
/// Returns reference to prune modes.
|
||||
pub const fn prune_modes_ref(&self) -> &PruneModes {
|
||||
@@ -277,11 +259,6 @@ impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
|
||||
fn rocksdb_provider(&self) -> RocksDBProvider {
|
||||
self.rocksdb_provider.clone()
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
|
||||
self.pending_rocksdb_batches.lock().push(batch);
|
||||
}
|
||||
}
|
||||
|
||||
impl<TX: Debug + Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
|
||||
@@ -313,8 +290,6 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
|
||||
storage,
|
||||
storage_settings,
|
||||
rocksdb_provider,
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
|
||||
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
|
||||
}
|
||||
}
|
||||
@@ -570,8 +545,6 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
|
||||
storage,
|
||||
storage_settings,
|
||||
rocksdb_provider,
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
|
||||
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
|
||||
}
|
||||
}
|
||||
@@ -3205,7 +3178,7 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
|
||||
self.prune_modes_ref()
|
||||
}
|
||||
|
||||
/// Commit database transaction, static files, and pending `RocksDB` batches.
|
||||
/// Commit database transaction and static files.
|
||||
fn commit(self) -> ProviderResult<bool> {
|
||||
// For unwinding it makes more sense to commit the database first, since if
|
||||
// it is interrupted before the static files commit, we can just
|
||||
@@ -3213,27 +3186,9 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
|
||||
// checkpoints on the next start-up.
|
||||
if self.static_file_provider.has_unwind_queued() {
|
||||
self.tx.commit()?;
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
{
|
||||
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
|
||||
for batch in batches {
|
||||
self.rocksdb_provider.commit_batch(batch)?;
|
||||
}
|
||||
}
|
||||
|
||||
self.static_file_provider.commit()?;
|
||||
} else {
|
||||
self.static_file_provider.commit()?;
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
{
|
||||
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
|
||||
for batch in batches {
|
||||
self.rocksdb_provider.commit_batch(batch)?;
|
||||
}
|
||||
}
|
||||
|
||||
self.tx.commit()?;
|
||||
}
|
||||
|
||||
|
||||
@@ -450,19 +450,6 @@ impl RocksDBProvider {
|
||||
batch_handle.commit()
|
||||
})
|
||||
}
|
||||
|
||||
/// Commits a raw `WriteBatchWithTransaction` to `RocksDB`.
|
||||
///
|
||||
/// This is used when the batch was extracted via [`RocksDBBatch::into_inner`]
|
||||
/// and needs to be committed at a later point (e.g., at provider commit time).
|
||||
pub fn commit_batch(&self, batch: WriteBatchWithTransaction<true>) -> ProviderResult<()> {
|
||||
self.0.db.write_opt(batch, &WriteOptions::default()).map_err(|e| {
|
||||
ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
|
||||
message: e.to_string().into(),
|
||||
code: -1,
|
||||
}))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle for building a batch of operations atomically.
|
||||
@@ -542,13 +529,6 @@ impl<'a> RocksDBBatch<'a> {
|
||||
pub const fn provider(&self) -> &RocksDBProvider {
|
||||
self.provider
|
||||
}
|
||||
|
||||
/// Consumes the batch and returns the underlying `WriteBatchWithTransaction`.
|
||||
///
|
||||
/// This is used to defer commits to the provider level.
|
||||
pub fn into_inner(self) -> WriteBatchWithTransaction<true> {
|
||||
self.inner
|
||||
}
|
||||
}
|
||||
|
||||
/// `RocksDB` transaction wrapper providing MDBX-like semantics.
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::{
|
||||
providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBBuilder, StaticFileProvider},
|
||||
providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
|
||||
HashingWriter, ProviderFactory, TrieWriter,
|
||||
};
|
||||
use alloy_primitives::B256;
|
||||
@@ -62,10 +62,7 @@ pub fn create_test_provider_factory_with_node_types<N: NodeTypesForProvider>(
|
||||
db,
|
||||
chain_spec,
|
||||
StaticFileProvider::read_write(static_dir.keep()).expect("static file provider"),
|
||||
RocksDBBuilder::new(&rocksdb_dir)
|
||||
.with_default_tables()
|
||||
.build()
|
||||
.expect("failed to create test RocksDB provider"),
|
||||
RocksDBProvider::new(&rocksdb_dir).expect("failed to create test RocksDB provider"),
|
||||
)
|
||||
.expect("failed to create test provider factory")
|
||||
}
|
||||
|
||||
@@ -29,9 +29,4 @@ impl<C: Send + Sync, N: NodePrimitives> RocksDBProviderFactory for NoopProvider<
|
||||
fn rocksdb_provider(&self) -> RocksDBProvider {
|
||||
RocksDBProvider::builder(PathBuf::default()).build().unwrap()
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
|
||||
// No-op for NoopProvider
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,11 +6,4 @@ use crate::providers::RocksDBProvider;
|
||||
pub trait RocksDBProviderFactory {
|
||||
/// Returns the `RocksDB` provider.
|
||||
fn rocksdb_provider(&self) -> RocksDBProvider;
|
||||
|
||||
/// Adds a pending `RocksDB` batch to be committed when this provider is committed.
|
||||
///
|
||||
/// This allows deferring `RocksDB` commits to happen at the same time as MDBX and static file
|
||||
/// commits, ensuring atomicity across all storage backends.
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user