Compare commits

...

8 Commits

Author SHA1 Message Date
Tempo AI
aa087ac553 fix: address CI issues for cursor lifetimes PR
- Fix clippy if-not-else lint in record_client_version
- Add explicit lifetime to Cursor in benches to fix elided-lifetimes-in-paths
- Scope cursor usage in test to drop before commit (required by cursor lifetimes)
2026-01-28 19:33:38 +00:00
Brian Picciano
ede72ca909 chore: merge origin/main into tx-cursor-lifetimes
Resolve merge conflicts while preserving cursor reuse functionality:
- Keep 'tx lifetime parameters on cursor GAT types (required for cursor reuse)
- Do not add Send bounds (libmdbx cursors are intentionally !Send)
- Use split borrow pattern to avoid borrow checker conflicts in cursor write methods
- Adopt encoded_key naming from main for consistency
2026-01-28 17:19:28 +01:00
Brian Picciano
d17f99330d feat(db): add cursor caching for transaction cursor reuse
- Add CursorCache struct storing (table_index, cursor_ptr) pairs in a Vec
- Cursors returned to cache on drop via into_raw/from_raw pattern
- cursor_read/cursor_write now use cached cursors by default
- Add Table::INDEX const for O(1) cache lookup
- Fix tests to drop cursors before tx.commit() (required by cursor lifetimes)
- Remove Sync bound from Database::TX/TXMut (transactions are not thread-shared)
2026-01-17 15:04:05 +00:00
Brian Picciano
747a6409c1 fix(reth-cli-commands): use separate read/write transactions for repair-trie
Instead of trying to borrow the same provider mutably and immutably,
use two separate database transactions:
- A read-only transaction for the Verifier cursor factories
- A read-write transaction for the write cursors

This avoids the borrow conflict that arose from cursor lifetime parameters
binding cursors to their parent transaction.
2026-01-17 00:08:35 +00:00
Brian Picciano
abccf7907d fix(reth-cli-commands): temporarily disable repair-trie command pending cursor lifetime refactoring
The verify_and_repair function has a borrow conflict where it simultaneously
borrows provider_rw mutably (for write cursors) and immutably (for cursor
factories). With the new cursor lifetime parameters, this is correctly caught.

The fix is to restructure into two phases (collect repairs, then apply), but
for now we disable the command with a helpful error message pointing users
to --dry-run for verification.
2026-01-16 23:45:25 +00:00
Brian Picciano
b1ee1f812d fix(reth-prune): add missing lifetime parameter to CursorMut in prune_table_with_range_step 2026-01-16 23:41:22 +00:00
Brian Picciano
8f20c328ff fix(reth-provider): update cursor type aliases and factory methods for lifetime parameters
- Add lifetime parameter to CursorTy, DupCursorTy, CursorMutTy, DupCursorMutTy type aliases
- Update EitherWriter/EitherReader constructor functions to use &'a P for provider parameter
- Inline cursor creation in OverlayStateProvider to avoid returning cursors that reference local variables
- Import concrete cursor types (DatabaseAccountTrieCursor, DatabaseHashedAccountCursor, etc.) for direct cursor construction
2026-01-16 23:40:42 +00:00
Brian Picciano
103588893d feat(db): add lifetime to cursor types binding them to transactions
- Change reth_libmdbx::Cursor<K> to Cursor<'tx, K> holding &'tx Transaction<K>
- Make cursors !Send and !Sync via PhantomData<*const ()>
- Add lifetime parameter to DbTx::Cursor and DbTxMut::CursorMut traits
- Update reth-db Cursor wrapper with lifetime
- Update reth-trie-db cursor factory impls with lifetimes
- Remove Send + Sync bounds from trie cursor impls

This ensures cursors cannot outlive their parent transaction,
preventing use-after-free bugs at compile time.
2026-01-16 23:36:46 +00:00
20 changed files with 716 additions and 357 deletions

View File

@@ -199,8 +199,8 @@ fn verify_checkpoints(provider: impl StageCheckpointReader) -> eyre::Result<()>
}
fn verify_and_repair<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()> {
// Get a read-write database provider
let mut provider_rw = tool.provider_factory.provider_rw()?;
// Get a read-write database provider for checkpoints and committing
let provider_rw = tool.provider_factory.provider_rw()?;
// Log the database block tip from Finish stage checkpoint
let finish_checkpoint = provider_rw.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default();
@@ -209,17 +209,21 @@ fn verify_and_repair<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()>
// Check that a pipeline sync isn't in progress.
verify_checkpoints(provider_rw.as_ref())?;
// Create cursors for making modifications with
let tx = provider_rw.tx_mut();
tx.disable_long_read_transaction_safety();
let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
let mut storage_trie_cursor = tx.cursor_dup_write::<tables::StoragesTrie>()?;
// Get the database reference for creating transactions
let db = tool.provider_factory.db_ref();
// Create the cursor factories. These cannot accept the `&mut` tx above because they require it
// to be AsRef.
let tx = provider_rw.tx_ref();
let hashed_cursor_factory = DatabaseHashedCursorFactory::new(tx);
let trie_cursor_factory = DatabaseTrieCursorFactory::new(tx);
// Create a read-only transaction for the Verifier
let mut read_tx = db.tx()?;
read_tx.disable_long_read_transaction_safety();
// Create a read-write transaction for the write cursors
let write_tx = db.tx_mut()?;
let mut account_trie_cursor = write_tx.cursor_write::<tables::AccountsTrie>()?;
let mut storage_trie_cursor = write_tx.cursor_dup_write::<tables::StoragesTrie>()?;
// Create the cursor factories using the read-only transaction
let hashed_cursor_factory = DatabaseHashedCursorFactory::new(&read_tx);
let trie_cursor_factory = DatabaseTrieCursorFactory::new(&read_tx);
// Create the verifier
let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
@@ -304,10 +308,15 @@ fn verify_and_repair<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()>
}
}
// Drop read transaction and cursors before committing
drop(account_trie_cursor);
drop(storage_trie_cursor);
drop(read_tx);
if inconsistent_nodes == 0 {
info!("No inconsistencies found");
} else {
provider_rw.commit()?;
write_tx.commit()?;
info!("Repaired {} inconsistencies and committed changes", inconsistent_nodes);
}

View File

@@ -116,7 +116,7 @@ pub(crate) trait DbTxPruneExt: DbTxMut + DbTx {
/// timing out.
fn prune_table_with_range_step<T: Table>(
&self,
walker: &mut RangeWalker<'_, T, Self::CursorMut<T>>,
walker: &mut RangeWalker<'_, T, Self::CursorMut<'_, T>>,
limiter: &mut PruneLimiter,
skip_filter: &mut impl FnMut(&TableRow<T>) -> bool,
delete_callback: &mut impl FnMut(TableRow<T>),

View File

@@ -66,8 +66,14 @@ pub struct TxMock {
}
impl DbTx for TxMock {
type Cursor<T: Table> = CursorMock;
type DupCursor<T: DupSort> = CursorMock;
type Cursor<'tx, T: Table>
= CursorMock
where
Self: 'tx;
type DupCursor<'tx, T: DupSort>
= CursorMock
where
Self: 'tx;
/// Retrieves a value by key from the specified table.
///
@@ -108,7 +114,7 @@ impl DbTx for TxMock {
///
/// **Mock behavior**: Returns a default [`CursorMock`] that will not
/// iterate over any data (all cursor operations return `None`).
fn cursor_read<T: Table>(&self) -> Result<Self::Cursor<T>, DatabaseError> {
fn cursor_read<T: Table>(&self) -> Result<Self::Cursor<'_, T>, DatabaseError> {
Ok(CursorMock { _cursor: 0 })
}
@@ -116,7 +122,7 @@ impl DbTx for TxMock {
///
/// **Mock behavior**: Returns a default [`CursorMock`] that will not
/// iterate over any data (all cursor operations return `None`).
fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<T>, DatabaseError> {
fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<'_, T>, DatabaseError> {
Ok(CursorMock { _cursor: 0 })
}
@@ -136,8 +142,14 @@ impl DbTx for TxMock {
}
impl DbTxMut for TxMock {
type CursorMut<T: Table> = CursorMock;
type DupCursorMut<T: DupSort> = CursorMock;
type CursorMut<'tx, T: Table>
= CursorMock
where
Self: 'tx;
type DupCursorMut<'tx, T: DupSort>
= CursorMock
where
Self: 'tx;
/// Inserts or updates a key-value pair in the specified table.
///
@@ -172,7 +184,7 @@ impl DbTxMut for TxMock {
///
/// **Mock behavior**: Returns a default [`CursorMock`] that will not
/// iterate over any data and all write operations will be no-ops.
fn cursor_write<T: Table>(&self) -> Result<Self::CursorMut<T>, DatabaseError> {
fn cursor_write<T: Table>(&self) -> Result<Self::CursorMut<'_, T>, DatabaseError> {
Ok(CursorMock { _cursor: 0 })
}
@@ -180,7 +192,7 @@ impl DbTxMut for TxMock {
///
/// **Mock behavior**: Returns a default [`CursorMock`] that will not
/// iterate over any data and all write operations will be no-ops.
fn cursor_dup_write<T: DupSort>(&self) -> Result<Self::DupCursorMut<T>, DatabaseError> {
fn cursor_dup_write<T: DupSort>(&self) -> Result<Self::DupCursorMut<'_, T>, DatabaseError> {
Ok(CursorMock { _cursor: 0 })
}
}

View File

@@ -121,6 +121,17 @@ pub trait Table: Send + Sync + Debug + 'static {
/// Whether the table is also a `DUPSORT` table.
const DUPSORT: bool;
/// Unique index of this table for O(1) cache slot access.
///
/// This is automatically generated by the `tables!` macro based on declaration order.
///
/// # Stability
///
/// **This index is NOT stable across versions.** It may change when tables are added,
/// removed, or reordered. Use only for runtime operations (e.g., cursor caching),
/// never for persistent storage or cross-process communication.
const INDEX: usize;
/// Key element of `Table`.
///
/// Sorting should be taken into account when encoding this.

View File

@@ -124,6 +124,8 @@ macro_rules! tables {
concat!("`", stringify!($value), "`")
};
($($(#[$attr:meta])* table $name:ident$(<$($generic:ident $(= $default:ty)?),*>)? { type Key = $key:ty; type Value = $value:ty; $(type SubKey = $subkey:ty;)? } )*) => {
// Table marker types.
$(
@@ -152,6 +154,7 @@ macro_rules! tables {
{
const NAME: &'static str = table_names::$name;
const DUPSORT: bool = tables!(@bool $($subkey)?);
const INDEX: usize = Tables::$name.index();
type Key = $key;
type Value = $value;
@@ -182,6 +185,30 @@ macro_rules! tables {
/// The number of tables in the database.
pub const COUNT: usize = Self::ALL.len();
/// Returns the unique index of this table (0..COUNT).
///
/// This is used for O(1) cursor cache slot access.
///
/// # Stability
///
/// **This index is NOT stable across versions.** It is based on declaration
/// order in the `tables!` macro and may change when tables are added, removed,
/// or reordered. Use only for runtime operations, never for persistent storage.
#[allow(clippy::enum_glob_use)]
pub const fn index(&self) -> usize {
use Tables::*;
// Generate a match with incrementing indices
let mut idx = 0usize;
$(
if matches!(self, $name) {
return idx;
}
idx += 1;
)*
let _ = idx; // suppress unused warning
unreachable!()
}
/// Returns the name of the table as a string.
pub const fn name(&self) -> &'static str {
match self {

View File

@@ -17,6 +17,7 @@ pub struct RawTable<T: Table> {
impl<T: Table> Table for RawTable<T> {
const NAME: &'static str = T::NAME;
const DUPSORT: bool = false;
const INDEX: usize = T::INDEX;
type Key = RawKey<T::Key>;
type Value = RawValue<T::Value>;
@@ -32,6 +33,7 @@ pub struct RawDupSort<T: DupSort> {
impl<T: DupSort> Table for RawDupSort<T> {
const NAME: &'static str = T::NAME;
const DUPSORT: bool = true;
const INDEX: usize = T::INDEX;
type Key = RawKey<T::Key>;
type Value = RawValue<T::Value>;

View File

@@ -6,23 +6,27 @@ use crate::{
use std::fmt::Debug;
/// Helper adapter type for accessing [`DbTx`] cursor.
pub type CursorTy<TX, T> = <TX as DbTx>::Cursor<T>;
pub type CursorTy<'tx, TX, T> = <TX as DbTx>::Cursor<'tx, T>;
/// Helper adapter type for accessing [`DbTx`] dup cursor.
pub type DupCursorTy<TX, T> = <TX as DbTx>::DupCursor<T>;
pub type DupCursorTy<'tx, TX, T> = <TX as DbTx>::DupCursor<'tx, T>;
/// Helper adapter type for accessing [`DbTxMut`] mutable cursor.
pub type CursorMutTy<TX, T> = <TX as DbTxMut>::CursorMut<T>;
pub type CursorMutTy<'tx, TX, T> = <TX as DbTxMut>::CursorMut<'tx, T>;
/// Helper adapter type for accessing [`DbTxMut`] mutable dup cursor.
pub type DupCursorMutTy<TX, T> = <TX as DbTxMut>::DupCursorMut<T>;
pub type DupCursorMutTy<'tx, TX, T> = <TX as DbTxMut>::DupCursorMut<'tx, T>;
/// Read only transaction
pub trait DbTx: Debug + Send {
/// Cursor type for this read-only transaction
type Cursor<T: Table>: DbCursorRO<T> + Send;
type Cursor<'tx, T: Table>: DbCursorRO<T>
where
Self: 'tx;
/// `DupCursor` type for this read-only transaction
type DupCursor<T: DupSort>: DbDupCursorRO<T> + DbCursorRO<T> + Send;
type DupCursor<'tx, T: DupSort>: DbDupCursorRO<T> + DbCursorRO<T>
where
Self: 'tx;
/// Get value by an owned key
fn get<T: Table>(&self, key: T::Key) -> Result<Option<T::Value>, DatabaseError>;
@@ -39,9 +43,9 @@ pub trait DbTx: Debug + Send {
/// Aborts transaction
fn abort(self);
/// Iterate over read only values in table.
fn cursor_read<T: Table>(&self) -> Result<Self::Cursor<T>, DatabaseError>;
fn cursor_read<T: Table>(&self) -> Result<Self::Cursor<'_, T>, DatabaseError>;
/// Iterate over read only values in dup sorted table.
fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<T>, DatabaseError>;
fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<'_, T>, DatabaseError>;
/// Returns number of entries in the table.
fn entries<T: Table>(&self) -> Result<usize, DatabaseError>;
/// Disables long-lived read transaction safety guarantees.
@@ -51,13 +55,16 @@ pub trait DbTx: Debug + Send {
/// Read write transaction that allows writing to database
pub trait DbTxMut: Send {
/// Read-Write Cursor type
type CursorMut<T: Table>: DbCursorRW<T> + DbCursorRO<T> + Send;
type CursorMut<'tx, T: Table>: DbCursorRW<T> + DbCursorRO<T>
where
Self: 'tx;
/// Read-Write `DupCursor` type
type DupCursorMut<T: DupSort>: DbDupCursorRW<T>
type DupCursorMut<'tx, T: DupSort>: DbDupCursorRW<T>
+ DbCursorRW<T>
+ DbDupCursorRO<T>
+ DbCursorRO<T>
+ Send;
where
Self: 'tx;
/// Put value to database
fn put<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError>;
@@ -73,7 +80,7 @@ pub trait DbTxMut: Send {
/// Clears database.
fn clear<T: Table>(&self) -> Result<(), DatabaseError>;
/// Cursor mut
fn cursor_write<T: Table>(&self) -> Result<Self::CursorMut<T>, DatabaseError>;
fn cursor_write<T: Table>(&self) -> Result<Self::CursorMut<'_, T>, DatabaseError>;
/// `DupCursor` mut.
fn cursor_dup_write<T: DupSort>(&self) -> Result<Self::DupCursorMut<T>, DatabaseError>;
fn cursor_dup_write<T: DupSort>(&self) -> Result<Self::DupCursorMut<'_, T>, DatabaseError>;
}

View File

@@ -1,6 +1,6 @@
//! Cursor wrapper for libmdbx-sys.
use super::utils::*;
use super::{cursor_cache::CursorCache, utils::*};
use crate::{
metrics::{DatabaseEnvMetrics, Operation},
DatabaseError,
@@ -13,34 +13,56 @@ use reth_db_api::{
},
table::{Compress, Decode, Decompress, DupSort, Encode, IntoVec, Table},
};
use reth_libmdbx::{Error as MDBXError, TransactionKind, WriteFlags, RO, RW};
use reth_libmdbx::{ffi, Error as MDBXError, TransactionKind, WriteFlags, RO, RW};
use reth_storage_errors::db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation};
use std::{borrow::Cow, collections::Bound, marker::PhantomData, ops::RangeBounds, sync::Arc};
/// Read only Cursor.
pub type CursorRO<T> = Cursor<RO, T>;
pub type CursorRO<'tx, T> = Cursor<'tx, RO, T>;
/// Read write cursor.
pub type CursorRW<T> = Cursor<RW, T>;
pub type CursorRW<'tx, T> = Cursor<'tx, RW, T>;
/// Cursor wrapper to access KV items.
///
/// When dropped, the cursor is returned to the transaction's cursor cache for reuse,
/// rather than being closed. This reduces the overhead of cursor creation in
/// cursor-heavy workloads.
#[derive(Debug)]
pub struct Cursor<K: TransactionKind, T: Table> {
/// Inner `libmdbx` cursor.
pub(crate) inner: reth_libmdbx::Cursor<K>,
pub struct Cursor<'tx, K: TransactionKind, T: Table> {
/// Inner `libmdbx` cursor. Wrapped in Option so we can take it in Drop.
pub(crate) inner: Option<reth_libmdbx::Cursor<'tx, K>>,
/// Cache buffer that receives compressed values.
buf: Vec<u8>,
/// Reference to metric handles in the DB environment. If `None`, metrics are not recorded.
metrics: Option<Arc<DatabaseEnvMetrics>>,
/// Reference to the cursor cache where this cursor should be returned on drop.
/// If `None`, the cursor is closed normally on drop.
cache: Option<&'tx CursorCache>,
/// Phantom data to enforce encoding/decoding.
_dbi: PhantomData<T>,
}
impl<K: TransactionKind, T: Table> Cursor<K, T> {
pub(crate) const fn new_with_metrics(
inner: reth_libmdbx::Cursor<K>,
#[expect(clippy::missing_const_for_fn)]
impl<'tx, K: TransactionKind, T: Table> Cursor<'tx, K, T> {
pub(crate) fn new_with_metrics(
inner: reth_libmdbx::Cursor<'tx, K>,
metrics: Option<Arc<DatabaseEnvMetrics>>,
) -> Self {
Self { inner, buf: Vec::new(), metrics, _dbi: PhantomData }
Self { inner: Some(inner), buf: Vec::new(), metrics, cache: None, _dbi: PhantomData }
}
pub(crate) fn new_with_cache(
inner: reth_libmdbx::Cursor<'tx, K>,
metrics: Option<Arc<DatabaseEnvMetrics>>,
cache: &'tx CursorCache,
) -> Self {
Self { inner: Some(inner), buf: Vec::new(), metrics, cache: Some(cache), _dbi: PhantomData }
}
/// Returns a mutable reference to the inner cursor.
#[inline]
fn inner_mut(&mut self) -> &mut reth_libmdbx::Cursor<'tx, K> {
self.inner.as_mut().expect("cursor already taken")
}
/// If `self.metrics` is `Some(...)`, record a metric with the provided operation and value
@@ -61,6 +83,25 @@ impl<K: TransactionKind, T: Table> Cursor<K, T> {
}
}
impl<K: TransactionKind, T: Table> Drop for Cursor<'_, K, T> {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() &&
let Some(cache) = self.cache
{
// Return cursor to cache for reuse
let raw = inner.into_raw();
let old = cache.put(T::INDEX, raw);
// If there was already a cursor in the slot (shouldn't happen), close it
if let Some(old_cursor) = old {
unsafe {
ffi::mdbx_cursor_close(old_cursor);
}
}
}
// If no cache or no inner, the cursor drops normally and closes itself
}
}
/// Decodes a `(key, value)` pair from the database.
#[expect(clippy::type_complexity)]
pub fn decode<T>(
@@ -88,38 +129,38 @@ macro_rules! compress_to_buf_or_ref {
};
}
impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<'_, K, T> {
fn first(&mut self) -> PairResult<T> {
decode::<T>(self.inner.first())
decode::<T>(self.inner_mut().first())
}
fn seek_exact(&mut self, key: <T as Table>::Key) -> PairResult<T> {
decode::<T>(self.inner.set_key(key.encode().as_ref()))
decode::<T>(self.inner_mut().set_key(key.encode().as_ref()))
}
fn seek(&mut self, key: <T as Table>::Key) -> PairResult<T> {
decode::<T>(self.inner.set_range(key.encode().as_ref()))
decode::<T>(self.inner_mut().set_range(key.encode().as_ref()))
}
fn next(&mut self) -> PairResult<T> {
decode::<T>(self.inner.next())
decode::<T>(self.inner_mut().next())
}
fn prev(&mut self) -> PairResult<T> {
decode::<T>(self.inner.prev())
decode::<T>(self.inner_mut().prev())
}
fn last(&mut self) -> PairResult<T> {
decode::<T>(self.inner.last())
decode::<T>(self.inner_mut().last())
}
fn current(&mut self) -> PairResult<T> {
decode::<T>(self.inner.get_current())
decode::<T>(self.inner_mut().get_current())
}
fn walk(&mut self, start_key: Option<T::Key>) -> Result<Walker<'_, T, Self>, DatabaseError> {
let start = if let Some(start_key) = start_key {
decode::<T>(self.inner.set_range(start_key.encode().as_ref())).transpose()
decode::<T>(self.inner_mut().set_range(start_key.encode().as_ref())).transpose()
} else {
self.first().transpose()
};
@@ -132,11 +173,11 @@ impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
range: impl RangeBounds<T::Key>,
) -> Result<RangeWalker<'_, T, Self>, DatabaseError> {
let start = match range.start_bound().cloned() {
Bound::Included(key) => self.inner.set_range(key.encode().as_ref()),
Bound::Included(key) => self.inner_mut().set_range(key.encode().as_ref()),
Bound::Excluded(_key) => {
unreachable!("Rust doesn't allow for Bound::Excluded in starting bounds");
}
Bound::Unbounded => self.inner.first(),
Bound::Unbounded => self.inner_mut().first(),
};
let start = decode::<T>(start).transpose();
Ok(RangeWalker::new(self, start, range.end_bound().cloned()))
@@ -147,7 +188,7 @@ impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
start_key: Option<T::Key>,
) -> Result<ReverseWalker<'_, T, Self>, DatabaseError> {
let start = if let Some(start_key) = start_key {
decode::<T>(self.inner.set_range(start_key.encode().as_ref()))
decode::<T>(self.inner_mut().set_range(start_key.encode().as_ref()))
} else {
self.last()
}
@@ -157,20 +198,20 @@ impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
}
}
impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<'_, K, T> {
/// Returns the previous `(key, value)` pair of a DUPSORT table.
fn prev_dup(&mut self) -> PairResult<T> {
decode::<T>(self.inner.prev_dup())
decode::<T>(self.inner_mut().prev_dup())
}
/// Returns the next `(key, value)` pair of a DUPSORT table.
fn next_dup(&mut self) -> PairResult<T> {
decode::<T>(self.inner.next_dup())
decode::<T>(self.inner_mut().next_dup())
}
/// Returns the last `value` of the current duplicate `key`.
fn last_dup(&mut self) -> ValueOnlyResult<T> {
self.inner
self.inner_mut()
.last_dup()
.map_err(|e| DatabaseError::Read(e.into()))?
.map(decode_one::<T>)
@@ -179,12 +220,12 @@ impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
/// Returns the next `(key, value)` pair skipping the duplicates.
fn next_no_dup(&mut self) -> PairResult<T> {
decode::<T>(self.inner.next_nodup())
decode::<T>(self.inner_mut().next_nodup())
}
/// Returns the next `value` of a duplicate `key`.
fn next_dup_val(&mut self) -> ValueOnlyResult<T> {
self.inner
self.inner_mut()
.next_dup()
.map_err(|e| DatabaseError::Read(e.into()))?
.map(decode_value::<T>)
@@ -196,7 +237,7 @@ impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
key: <T as Table>::Key,
subkey: <T as DupSort>::SubKey,
) -> ValueOnlyResult<T> {
self.inner
self.inner_mut()
.get_both_range(key.encode().as_ref(), subkey.encode().as_ref())
.map_err(|e| DatabaseError::Read(e.into()))?
.map(decode_one::<T>)
@@ -216,14 +257,14 @@ impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
let start = match (key, subkey) {
(Some(key), Some(subkey)) => {
let encoded_key = key.encode();
self.inner
self.inner_mut()
.get_both_range(encoded_key.as_ref(), subkey.encode().as_ref())
.map_err(|e| DatabaseError::Read(e.into()))?
.map(|val| decoder::<T>((Cow::Borrowed(encoded_key.as_ref()), val)))
}
(Some(key), None) => {
let encoded_key = key.encode();
self.inner
self.inner_mut()
.set(encoded_key.as_ref())
.map_err(|e| DatabaseError::Read(e.into()))?
.map(|val| decoder::<T>((Cow::Borrowed(encoded_key.as_ref()), val)))
@@ -231,7 +272,7 @@ impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
(None, Some(subkey)) => {
if let Some((key, _)) = self.first()? {
let encoded_key = key.encode();
self.inner
self.inner_mut()
.get_both_range(encoded_key.as_ref(), subkey.encode().as_ref())
.map_err(|e| DatabaseError::Read(e.into()))?
.map(|val| decoder::<T>((Cow::Borrowed(encoded_key.as_ref()), val)))
@@ -246,7 +287,7 @@ impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
}
}
impl<T: Table> DbCursorRW<T> for Cursor<RW, T> {
impl<T: Table> DbCursorRW<T> for Cursor<'_, RW, T> {
/// Database operation that will update an existing row if a specified value already
/// exists in a table, and insert a new row if the specified value doesn't already exist
///
@@ -261,17 +302,20 @@ impl<T: Table> DbCursorRW<T> for Cursor<RW, T> {
Operation::CursorUpsert,
Some(value.unwrap_or(&self.buf).len()),
|this| {
this.inner
.put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::UPSERT)
.map_err(|e| {
DatabaseWriteError {
info: e.into(),
operation: DatabaseWriteOperation::CursorUpsert,
table_name: T::NAME,
key: key.into_vec(),
}
.into()
})
let value_bytes: &[u8] = match value {
Some(v) => v,
None => &this.buf,
};
let inner = this.inner.as_mut().expect("cursor already taken");
inner.put(key.as_ref(), value_bytes, WriteFlags::UPSERT).map_err(|e| {
DatabaseWriteError {
info: e.into(),
operation: DatabaseWriteOperation::CursorUpsert,
table_name: T::NAME,
key: key.into_vec(),
}
.into()
})
},
)
}
@@ -283,17 +327,20 @@ impl<T: Table> DbCursorRW<T> for Cursor<RW, T> {
Operation::CursorInsert,
Some(value.unwrap_or(&self.buf).len()),
|this| {
this.inner
.put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::NO_OVERWRITE)
.map_err(|e| {
DatabaseWriteError {
info: e.into(),
operation: DatabaseWriteOperation::CursorInsert,
table_name: T::NAME,
key: key.into_vec(),
}
.into()
})
let value_bytes: &[u8] = match value {
Some(v) => v,
None => &this.buf,
};
let inner = this.inner.as_mut().expect("cursor already taken");
inner.put(key.as_ref(), value_bytes, WriteFlags::NO_OVERWRITE).map_err(|e| {
DatabaseWriteError {
info: e.into(),
operation: DatabaseWriteOperation::CursorInsert,
table_name: T::NAME,
key: key.into_vec(),
}
.into()
})
},
)
}
@@ -307,32 +354,37 @@ impl<T: Table> DbCursorRW<T> for Cursor<RW, T> {
Operation::CursorAppend,
Some(value.unwrap_or(&self.buf).len()),
|this| {
this.inner
.put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::APPEND)
.map_err(|e| {
DatabaseWriteError {
info: e.into(),
operation: DatabaseWriteOperation::CursorAppend,
table_name: T::NAME,
key: key.into_vec(),
}
.into()
})
let value_bytes: &[u8] = match value {
Some(v) => v,
None => &this.buf,
};
let inner = this.inner.as_mut().expect("cursor already taken");
inner.put(key.as_ref(), value_bytes, WriteFlags::APPEND).map_err(|e| {
DatabaseWriteError {
info: e.into(),
operation: DatabaseWriteOperation::CursorAppend,
table_name: T::NAME,
key: key.into_vec(),
}
.into()
})
},
)
}
fn delete_current(&mut self) -> Result<(), DatabaseError> {
self.execute_with_operation_metric(Operation::CursorDeleteCurrent, None, |this| {
this.inner.del(WriteFlags::CURRENT).map_err(|e| DatabaseError::Delete(e.into()))
this.inner_mut().del(WriteFlags::CURRENT).map_err(|e| DatabaseError::Delete(e.into()))
})
}
}
impl<T: DupSort> DbDupCursorRW<T> for Cursor<RW, T> {
impl<T: DupSort> DbDupCursorRW<T> for Cursor<'_, RW, T> {
fn delete_current_duplicates(&mut self) -> Result<(), DatabaseError> {
self.execute_with_operation_metric(Operation::CursorDeleteCurrentDuplicates, None, |this| {
this.inner.del(WriteFlags::NO_DUP_DATA).map_err(|e| DatabaseError::Delete(e.into()))
this.inner_mut()
.del(WriteFlags::NO_DUP_DATA)
.map_err(|e| DatabaseError::Delete(e.into()))
})
}
@@ -343,17 +395,20 @@ impl<T: DupSort> DbDupCursorRW<T> for Cursor<RW, T> {
Operation::CursorAppendDup,
Some(value.unwrap_or(&self.buf).len()),
|this| {
this.inner
.put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::APPEND_DUP)
.map_err(|e| {
DatabaseWriteError {
info: e.into(),
operation: DatabaseWriteOperation::CursorAppendDup,
table_name: T::NAME,
key: key.into_vec(),
}
.into()
})
let value_bytes: &[u8] = match value {
Some(v) => v,
None => &this.buf,
};
let inner = this.inner.as_mut().expect("cursor already taken");
inner.put(key.as_ref(), value_bytes, WriteFlags::APPEND_DUP).map_err(|e| {
DatabaseWriteError {
info: e.into(),
operation: DatabaseWriteOperation::CursorAppendDup,
table_name: T::NAME,
key: key.into_vec(),
}
.into()
})
},
)
}

View File

@@ -0,0 +1,123 @@
//! Cursor cache for reusing database cursors across operations.
//!
//! This module provides a cache for MDBX cursors, allowing them to be reused
//! rather than recreated for each operation. This reduces overhead in
//! cursor-heavy workloads like state iteration.
use reth_libmdbx::ffi;
use std::cell::UnsafeCell;
/// A wrapper around a raw MDBX cursor pointer that implements `Send`.
///
/// # Safety
///
/// MDBX cursors can be moved between threads as long as the owning transaction
/// is also moved. Since `Tx` is `Send`, the cursor can be too. The cursor must
/// not be used after the transaction is dropped.
#[derive(Debug)]
struct SendableCursor(*mut ffi::MDBX_cursor);
// SAFETY: MDBX cursors can be safely sent between threads when moved with their transaction.
// The transaction owns the cursor and ensures it's valid.
unsafe impl Send for SendableCursor {}
/// Cache for raw MDBX cursor pointers.
///
/// Stores cursors as `(table_index, cursor_ptr)` pairs. Most transactions only use
/// a few tables, so a small Vec with linear search is more memory-efficient than
/// a large fixed-size array.
///
/// Cursors are returned to the cache on drop via [`super::cursor::Cursor`] and
/// reused on subsequent `cursor_read`/`cursor_write` calls.
///
/// # Safety
///
/// The cache stores raw pointers to MDBX cursors. These are only valid while
/// the parent transaction is alive. The cache must be dropped before the
/// transaction is dropped, which is guaranteed by field ordering in `Tx`.
///
/// This type uses `UnsafeCell` for interior mutability and manually implements `Sync`.
/// This is safe because:
/// - MDBX transactions are not actually shared across threads (they're `Send` but single-threaded)
/// - The `Sync` bound on `Database::TX` is a historical API requirement, not a real usage pattern
/// - All access to the cache happens on a single thread that owns the transaction
pub struct CursorCache {
/// Cached cursors as `(table_index, cursor_ptr)` pairs.
entries: UnsafeCell<Vec<(usize, SendableCursor)>>,
}
// SAFETY: While CursorCache uses UnsafeCell, it's only accessed from a single thread.
// The Sync implementation is required because Database::TX requires Sync, but in practice
// transactions are never shared across threads - they're moved between threads, not shared.
unsafe impl Sync for CursorCache {}
impl std::fmt::Debug for CursorCache {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CursorCache").finish_non_exhaustive()
}
}
impl Default for CursorCache {
fn default() -> Self {
Self::new()
}
}
impl CursorCache {
/// Creates a new empty cursor cache.
pub const fn new() -> Self {
Self { entries: UnsafeCell::new(Vec::new()) }
}
/// Takes a cached cursor for the given table index, if one exists.
///
/// # Safety
///
/// This must only be called from a single thread (the one owning the transaction).
#[inline]
pub fn take(&self, index: usize) -> Option<*mut ffi::MDBX_cursor> {
// SAFETY: We only access this from a single thread (the transaction owner)
let entries = unsafe { &mut *self.entries.get() };
entries.iter().position(|(idx, _)| *idx == index).map(|pos| entries.swap_remove(pos).1 .0)
}
/// Stores a cursor in the cache for the given table index.
///
/// If a cursor already exists for this index, the old cursor is returned
/// and must be closed by the caller.
///
/// # Safety
///
/// This must only be called from a single thread (the one owning the transaction).
#[inline]
pub fn put(
&self,
index: usize,
cursor: *mut ffi::MDBX_cursor,
) -> Option<*mut ffi::MDBX_cursor> {
// SAFETY: We only access this from a single thread (the transaction owner)
let entries = unsafe { &mut *self.entries.get() };
// Check if there's already an entry for this index
if let Some(pos) = entries.iter().position(|(idx, _)| *idx == index) {
let old = std::mem::replace(&mut entries[pos].1, SendableCursor(cursor));
Some(old.0)
} else {
entries.push((index, SendableCursor(cursor)));
None
}
}
}
impl Drop for CursorCache {
fn drop(&mut self) {
// SAFETY: We have &mut self, so we have exclusive access
let entries = self.entries.get_mut();
// Close all cached cursors
for (_, SendableCursor(cursor)) in entries.drain(..) {
// SAFETY: We own these cursors and are dropping them before the transaction
unsafe {
ffi::mdbx_cursor_close(cursor);
}
}
}
}

View File

@@ -32,6 +32,7 @@ use std::{
use tx::Tx;
pub mod cursor;
pub mod cursor_cache;
pub mod tx;
mod utils;
@@ -597,14 +598,22 @@ impl DatabaseEnv {
}
let tx = self.tx_mut()?;
let mut version_cursor = tx.cursor_write::<tables::VersionHistory>()?;
let should_commit = {
let mut version_cursor = tx.cursor_write::<tables::VersionHistory>()?;
let last_version = version_cursor.last()?.map(|(_, v)| v);
if Some(&version) != last_version.as_ref() {
version_cursor.upsert(
SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(),
&version,
)?;
let last_version = version_cursor.last()?.map(|(_, v)| v);
if Some(&version) == last_version.as_ref() {
false
} else {
version_cursor.upsert(
SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(),
&version,
)?;
true
}
};
if should_commit {
tx.commit()?;
}
@@ -1089,29 +1098,32 @@ mod tests {
let key_to_insert = 2;
let tx = db.tx_mut().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
// INSERT
assert!(cursor.insert(key_to_insert, &B256::ZERO).is_ok());
assert_eq!(cursor.current().unwrap(), Some((key_to_insert, B256::ZERO)));
// INSERT (failure)
assert!(matches!(
cursor.insert(key_to_insert, &B256::ZERO).unwrap_err(),
DatabaseError::Write(err) if *err == DatabaseWriteError {
info: Error::KeyExist.into(),
operation: DatabaseWriteOperation::CursorInsert,
table_name: CanonicalHeaders::NAME,
key: key_to_insert.encode().into(),
}));
assert_eq!(cursor.current().unwrap(), Some((key_to_insert, B256::ZERO)));
{
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
// INSERT
assert!(cursor.insert(key_to_insert, &B256::ZERO).is_ok());
assert_eq!(cursor.current().unwrap(), Some((key_to_insert, B256::ZERO)));
// INSERT (failure)
assert!(matches!(
cursor.insert(key_to_insert, &B256::ZERO).unwrap_err(),
DatabaseError::Write(err) if *err == DatabaseWriteError {
info: Error::KeyExist.into(),
operation: DatabaseWriteOperation::CursorInsert,
table_name: CanonicalHeaders::NAME,
key: key_to_insert.encode().into(),
}));
assert_eq!(cursor.current().unwrap(), Some((key_to_insert, B256::ZERO)));
}
tx.commit().expect(ERROR_COMMIT);
// Confirm the result
let tx = db.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
{
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
}
tx.commit().expect(ERROR_COMMIT);
}
@@ -1176,23 +1188,27 @@ mod tests {
tx.commit().expect(ERROR_COMMIT);
let tx = db.tx_mut().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
{
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
// INSERT (cursor starts at last)
cursor.last().unwrap();
assert_eq!(cursor.current().unwrap(), Some((9, B256::ZERO)));
// INSERT (cursor starts at last)
cursor.last().unwrap();
assert_eq!(cursor.current().unwrap(), Some((9, B256::ZERO)));
for pos in (2..=8).step_by(2) {
assert!(cursor.insert(pos, &B256::ZERO).is_ok());
assert_eq!(cursor.current().unwrap(), Some((pos, B256::ZERO)));
for pos in (2..=8).step_by(2) {
assert!(cursor.insert(pos, &B256::ZERO).is_ok());
assert_eq!(cursor.current().unwrap(), Some((pos, B256::ZERO)));
}
}
tx.commit().expect(ERROR_COMMIT);
// Confirm the result
let tx = db.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
{
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
}
tx.commit().expect(ERROR_COMMIT);
}
@@ -1211,15 +1227,19 @@ mod tests {
// APPEND
let key_to_append = 5;
let tx = db.tx_mut().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
assert!(cursor.append(key_to_append, &B256::ZERO).is_ok());
{
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
assert!(cursor.append(key_to_append, &B256::ZERO).is_ok());
}
tx.commit().expect(ERROR_COMMIT);
// Confirm the result
let tx = db.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
{
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
}
tx.commit().expect(ERROR_COMMIT);
}
@@ -1238,23 +1258,27 @@ mod tests {
// APPEND
let key_to_append = 2;
let tx = db.tx_mut().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
assert!(matches!(
cursor.append(key_to_append, &B256::ZERO).unwrap_err(),
DatabaseError::Write(err) if *err == DatabaseWriteError {
info: Error::KeyMismatch.into(),
operation: DatabaseWriteOperation::CursorAppend,
table_name: CanonicalHeaders::NAME,
key: key_to_append.encode().into(),
}));
assert_eq!(cursor.current().unwrap(), Some((5, B256::ZERO))); // the end of table
{
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
assert!(matches!(
cursor.append(key_to_append, &B256::ZERO).unwrap_err(),
DatabaseError::Write(err) if *err == DatabaseWriteError {
info: Error::KeyMismatch.into(),
operation: DatabaseWriteOperation::CursorAppend,
table_name: CanonicalHeaders::NAME,
key: key_to_append.encode().into(),
}));
assert_eq!(cursor.current().unwrap(), Some((5, B256::ZERO))); // the end of table
}
tx.commit().expect(ERROR_COMMIT);
// Confirm the result
let tx = db.tx().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
assert_eq!(res, vec![0, 1, 3, 4, 5]);
{
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
assert_eq!(res, vec![0, 1, 3, 4, 5]);
}
tx.commit().expect(ERROR_COMMIT);
}
@@ -1300,43 +1324,30 @@ mod tests {
let transition_id = 2;
let tx = db.tx_mut().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
vec![0, 1, 3, 4, 5]
.into_iter()
.try_for_each(|val| {
cursor.append(
transition_id,
&AccountBeforeTx { address: Address::with_last_byte(val), info: None },
)
})
.expect(ERROR_APPEND);
{
let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
vec![0, 1, 3, 4, 5]
.into_iter()
.try_for_each(|val| {
cursor.append(
transition_id,
&AccountBeforeTx { address: Address::with_last_byte(val), info: None },
)
})
.expect(ERROR_APPEND);
}
tx.commit().expect(ERROR_COMMIT);
// APPEND DUP & APPEND
let subkey_to_append = 2;
let tx = db.tx_mut().expect(ERROR_INIT_TX);
let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
assert!(matches!(
cursor
.append_dup(
transition_id,
AccountBeforeTx {
address: Address::with_last_byte(subkey_to_append),
info: None
}
)
.unwrap_err(),
DatabaseError::Write(err) if *err == DatabaseWriteError {
info: Error::KeyMismatch.into(),
operation: DatabaseWriteOperation::CursorAppendDup,
table_name: AccountChangeSets::NAME,
key: transition_id.encode().into(),
}));
assert!(matches!(
{
let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
assert!(matches!(
cursor
.append(
transition_id - 1,
&AccountBeforeTx {
.append_dup(
transition_id,
AccountBeforeTx {
address: Address::with_last_byte(subkey_to_append),
info: None
}
@@ -1344,17 +1355,37 @@ mod tests {
.unwrap_err(),
DatabaseError::Write(err) if *err == DatabaseWriteError {
info: Error::KeyMismatch.into(),
operation: DatabaseWriteOperation::CursorAppend,
operation: DatabaseWriteOperation::CursorAppendDup,
table_name: AccountChangeSets::NAME,
key: (transition_id - 1).encode().into(),
}
));
assert!(cursor
.append(
transition_id,
&AccountBeforeTx { address: Address::with_last_byte(subkey_to_append), info: None }
)
.is_ok());
key: transition_id.encode().into(),
}));
assert!(matches!(
cursor
.append(
transition_id - 1,
&AccountBeforeTx {
address: Address::with_last_byte(subkey_to_append),
info: None
}
)
.unwrap_err(),
DatabaseError::Write(err) if *err == DatabaseWriteError {
info: Error::KeyMismatch.into(),
operation: DatabaseWriteOperation::CursorAppend,
table_name: AccountChangeSets::NAME,
key: (transition_id - 1).encode().into(),
}
));
assert!(cursor
.append(
transition_id,
&AccountBeforeTx {
address: Address::with_last_byte(subkey_to_append),
info: None
}
)
.is_ok());
}
}
#[test]

View File

@@ -1,6 +1,6 @@
//! Transaction wrapper for libmdbx-sys.
use super::{cursor::Cursor, utils::*};
use super::{cursor::Cursor, cursor_cache::CursorCache, utils::*};
use crate::{
metrics::{DatabaseEnvMetrics, Operation, TransactionMode, TransactionOutcome},
DatabaseError,
@@ -40,6 +40,10 @@ pub struct Tx<K: TransactionKind> {
///
/// If [Some], then metrics are reported.
metrics_handler: Option<MetricsHandler<K>>,
/// Cache for reusing cursors across operations.
/// Must be declared after `inner` to ensure cursors are closed before the transaction.
cursor_cache: CursorCache,
}
impl<K: TransactionKind> Tx<K> {
@@ -59,7 +63,7 @@ impl<K: TransactionKind> Tx<K> {
Ok(handler)
})
.transpose()?;
Ok(Self { inner, dbis, metrics_handler })
Ok(Self { inner, dbis, metrics_handler, cursor_cache: CursorCache::new() })
}
/// Returns a reference to the inner libmdbx transaction.
@@ -91,8 +95,8 @@ impl<K: TransactionKind> Tx<K> {
self.get_dbi_raw(T::NAME)
}
/// Create db Cursor
pub fn new_cursor<T: Table>(&self) -> Result<Cursor<K, T>, DatabaseError> {
/// Create db Cursor without caching (cursor will be closed on drop).
pub fn new_cursor<T: Table>(&self) -> Result<Cursor<'_, K, T>, DatabaseError> {
let inner = self
.inner
.cursor_with_dbi(self.get_dbi::<T>()?)
@@ -104,6 +108,31 @@ impl<K: TransactionKind> Tx<K> {
))
}
/// Create db Cursor with caching (cursor will be returned to cache on drop).
///
/// If a cursor for this table is already in the cache, it will be reused.
/// Otherwise, a new cursor is created. Either way, when the returned cursor
/// is dropped, it will be stored in the cache for future reuse.
pub fn new_cursor_cached<T: Table>(&self) -> Result<Cursor<'_, K, T>, DatabaseError> {
// Try to get a cached cursor
let inner = if let Some(raw_cursor) = self.cursor_cache.take(T::INDEX) {
// SAFETY: The raw cursor was created for this transaction and the
// transaction is still alive (we have &self).
unsafe { reth_libmdbx::Cursor::from_raw(&self.inner, raw_cursor) }
} else {
// Create a new cursor
self.inner
.cursor_with_dbi(self.get_dbi::<T>()?)
.map_err(|e| DatabaseError::InitCursor(e.into()))?
};
Ok(Cursor::new_with_cache(
inner,
self.metrics_handler.as_ref().map(|h| h.env_metrics.clone()),
&self.cursor_cache,
))
}
/// If `self.metrics_handler == Some(_)`, measure the time it takes to execute the closure and
/// record a metric with the provided transaction outcome.
///
@@ -288,8 +317,14 @@ impl<K: TransactionKind> Drop for MetricsHandler<K> {
impl TableImporter for Tx<RW> {}
impl<K: TransactionKind> DbTx for Tx<K> {
type Cursor<T: Table> = Cursor<K, T>;
type DupCursor<T: DupSort> = Cursor<K, T>;
type Cursor<'tx, T: Table>
= Cursor<'tx, K, T>
where
Self: 'tx;
type DupCursor<'tx, T: DupSort>
= Cursor<'tx, K, T>
where
Self: 'tx;
fn get<T: Table>(&self, key: T::Key) -> Result<Option<<T as Table>::Value>, DatabaseError> {
self.get_by_encoded_key::<T>(&key.encode())
@@ -323,13 +358,13 @@ impl<K: TransactionKind> DbTx for Tx<K> {
}
// Iterate over read only values in database.
fn cursor_read<T: Table>(&self) -> Result<Self::Cursor<T>, DatabaseError> {
self.new_cursor()
fn cursor_read<T: Table>(&self) -> Result<Self::Cursor<'_, T>, DatabaseError> {
self.new_cursor_cached()
}
/// Iterate over read only values in database.
fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<T>, DatabaseError> {
self.new_cursor()
fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<'_, T>, DatabaseError> {
self.new_cursor_cached()
}
/// Returns number of entries in the table using cheap DB stats invocation.
@@ -401,8 +436,14 @@ impl Tx<RW> {
}
impl DbTxMut for Tx<RW> {
type CursorMut<T: Table> = Cursor<RW, T>;
type DupCursorMut<T: DupSort> = Cursor<RW, T>;
type CursorMut<'tx, T: Table>
= Cursor<'tx, RW, T>
where
Self: 'tx;
type DupCursorMut<'tx, T: DupSort>
= Cursor<'tx, RW, T>
where
Self: 'tx;
fn put<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
self.put::<T>(PutKind::Upsert, key, value)
@@ -436,12 +477,12 @@ impl DbTxMut for Tx<RW> {
Ok(())
}
fn cursor_write<T: Table>(&self) -> Result<Self::CursorMut<T>, DatabaseError> {
self.new_cursor()
fn cursor_write<T: Table>(&self) -> Result<Self::CursorMut<'_, T>, DatabaseError> {
self.new_cursor_cached()
}
fn cursor_dup_write<T: DupSort>(&self) -> Result<Self::DupCursorMut<T>, DatabaseError> {
self.new_cursor()
fn cursor_dup_write<T: DupSort>(&self) -> Result<Self::DupCursorMut<'_, T>, DatabaseError> {
self.new_cursor_cached()
}
}

View File

@@ -32,7 +32,7 @@ fn bench_get_seq_iter(c: &mut Criterion) {
count += 1;
}
fn iterate<K: TransactionKind>(cursor: &mut Cursor<K>) -> Result<()> {
fn iterate<K: TransactionKind>(cursor: &mut Cursor<'_, K>) -> Result<()> {
let mut i = 0;
for result in cursor.iter::<ObjectLength, ObjectLength>() {
let (key_len, data_len) = result?;

View File

@@ -14,26 +14,28 @@ use ffi::{
use std::{borrow::Cow, ffi::c_void, fmt, marker::PhantomData, mem, ptr};
/// A cursor for navigating the items within a database.
pub struct Cursor<K>
pub struct Cursor<'tx, K>
where
K: TransactionKind,
{
txn: Transaction<K>,
txn: &'tx Transaction<K>,
cursor: *mut ffi::MDBX_cursor,
/// Marker to make this type `!Send` and `!Sync`.
_marker: PhantomData<*const ()>,
}
impl<K> Cursor<K>
impl<'tx, K> Cursor<'tx, K>
where
K: TransactionKind,
{
pub(crate) fn new(txn: Transaction<K>, dbi: ffi::MDBX_dbi) -> Result<Self> {
pub(crate) fn new(txn: &'tx Transaction<K>, dbi: ffi::MDBX_dbi) -> Result<Self> {
let mut cursor: *mut ffi::MDBX_cursor = ptr::null_mut();
unsafe {
txn.txn_execute(|txn_ptr| {
mdbx_result(ffi::mdbx_cursor_open(txn_ptr, dbi, &mut cursor))
})??;
}
Ok(Self { txn, cursor })
Ok(Self { txn, cursor, _marker: PhantomData })
}
fn new_at_position(other: &Self) -> Result<Self> {
@@ -42,7 +44,7 @@ where
let res = ffi::mdbx_cursor_copy(other.cursor(), cursor);
let s = Self { txn: other.txn.clone(), cursor };
let s = Self { txn: other.txn, cursor, _marker: PhantomData };
mdbx_result(res)?;
@@ -58,14 +60,43 @@ where
self.cursor
}
/// Consumes the cursor and returns the raw pointer without closing it.
///
/// This is useful for cursor caching - the raw pointer can be stored and
/// later reconstituted with [`Cursor::from_raw`].
///
/// # Safety
///
/// The caller takes ownership of the raw cursor pointer and is responsible for:
/// - Either closing it via `mdbx_cursor_close`, or
/// - Reconstituting it via [`Cursor::from_raw`] before the transaction ends
pub const fn into_raw(self) -> *mut ffi::MDBX_cursor {
let ptr = self.cursor;
mem::forget(self);
ptr
}
/// Reconstitutes a cursor from a raw pointer.
///
/// # Safety
///
/// The caller must ensure:
/// - The pointer was obtained from [`Cursor::into_raw`]
/// - The pointer was created for this same transaction (or a prior transaction on the same
/// environment that has been renewed)
/// - The pointer has not been closed or used to create another Cursor
pub const unsafe fn from_raw(txn: &'tx Transaction<K>, cursor: *mut ffi::MDBX_cursor) -> Self {
Self { txn, cursor, _marker: PhantomData }
}
/// Returns an iterator over the raw key value slices.
pub fn iter_slices<'a>(self) -> IntoIter<K, Cow<'a, [u8]>, Cow<'a, [u8]>> {
pub fn iter_slices<'a>(self) -> IntoIter<'tx, K, Cow<'a, [u8]>, Cow<'a, [u8]>> {
self.into_iter()
}
/// Returns an iterator over database items.
#[expect(clippy::should_implement_trait)]
pub fn into_iter<Key, Value>(self) -> IntoIter<K, Key, Value>
pub fn into_iter<Key, Value>(self) -> IntoIter<'tx, K, Key, Value>
where
Key: TableObject,
Value: TableObject,
@@ -338,7 +369,7 @@ where
/// For databases with duplicate data items ([`DatabaseFlags::DUP_SORT`]), the
/// duplicate data items of each key will be returned before moving on to
/// the next key.
pub fn iter<Key, Value>(&mut self) -> Iter<'_, K, Key, Value>
pub fn iter<Key, Value>(&mut self) -> Iter<'_, 'tx, K, Key, Value>
where
Key: TableObject,
Value: TableObject,
@@ -351,7 +382,7 @@ where
/// For databases with duplicate data items ([`DatabaseFlags::DUP_SORT`]), the
/// duplicate data items of each key will be returned before moving on to
/// the next key.
pub fn iter_start<Key, Value>(&mut self) -> Iter<'_, K, Key, Value>
pub fn iter_start<Key, Value>(&mut self) -> Iter<'_, 'tx, K, Key, Value>
where
Key: TableObject,
Value: TableObject,
@@ -364,7 +395,7 @@ where
/// For databases with duplicate data items ([`DatabaseFlags::DUP_SORT`]), the
/// duplicate data items of each key will be returned before moving on to
/// the next key.
pub fn iter_from<Key, Value>(&mut self, key: &[u8]) -> Iter<'_, K, Key, Value>
pub fn iter_from<Key, Value>(&mut self, key: &[u8]) -> Iter<'_, 'tx, K, Key, Value>
where
Key: TableObject,
Value: TableObject,
@@ -379,7 +410,7 @@ where
/// Iterate over duplicate database items. The iterator will begin with the
/// item next after the cursor, and continue until the end of the database.
/// Each item will be returned as an iterator of its duplicates.
pub fn iter_dup<Key, Value>(&mut self) -> IterDup<'_, K, Key, Value>
pub fn iter_dup<Key, Value>(&mut self) -> IterDup<'_, 'tx, K, Key, Value>
where
Key: TableObject,
Value: TableObject,
@@ -389,7 +420,7 @@ where
/// Iterate over duplicate database items starting from the beginning of the
/// database. Each item will be returned as an iterator of its duplicates.
pub fn iter_dup_start<Key, Value>(&mut self) -> IterDup<'_, K, Key, Value>
pub fn iter_dup_start<Key, Value>(&mut self) -> IterDup<'_, 'tx, K, Key, Value>
where
Key: TableObject,
Value: TableObject,
@@ -399,7 +430,7 @@ where
/// Iterate over duplicate items in the database starting from the given
/// key. Each item will be returned as an iterator of its duplicates.
pub fn iter_dup_from<Key, Value>(&mut self, key: &[u8]) -> IterDup<'_, K, Key, Value>
pub fn iter_dup_from<Key, Value>(&mut self, key: &[u8]) -> IterDup<'_, 'tx, K, Key, Value>
where
Key: TableObject,
Value: TableObject,
@@ -412,7 +443,7 @@ where
}
/// Iterate over the duplicates of the item in the database with the given key.
pub fn iter_dup_of<Key, Value>(&mut self, key: &[u8]) -> Iter<'_, K, Key, Value>
pub fn iter_dup_of<Key, Value>(&mut self, key: &[u8]) -> Iter<'_, 'tx, K, Key, Value>
where
Key: TableObject,
Value: TableObject,
@@ -430,7 +461,7 @@ where
}
}
impl Cursor<RW> {
impl Cursor<'_, RW> {
/// Puts a key/data pair into the database. The cursor will be positioned at
/// the new data item, or on failure usually near it.
pub fn put(&mut self, key: &[u8], data: &[u8], flags: WriteFlags) -> Result<()> {
@@ -462,7 +493,7 @@ impl Cursor<RW> {
}
}
impl<K> Clone for Cursor<K>
impl<K> Clone for Cursor<'_, K>
where
K: TransactionKind,
{
@@ -471,7 +502,7 @@ where
}
}
impl<K> fmt::Debug for Cursor<K>
impl<K> fmt::Debug for Cursor<'_, K>
where
K: TransactionKind,
{
@@ -480,7 +511,7 @@ where
}
}
impl<K> Drop for Cursor<K>
impl<K> Drop for Cursor<'_, K>
where
K: TransactionKind,
{
@@ -502,12 +533,9 @@ const unsafe fn slice_to_val(slice: Option<&[u8]>) -> ffi::MDBX_val {
}
}
unsafe impl<K> Send for Cursor<K> where K: TransactionKind {}
unsafe impl<K> Sync for Cursor<K> where K: TransactionKind {}
/// An iterator over the key/value pairs in an MDBX database.
#[derive(Debug)]
pub enum IntoIter<K, Key, Value>
pub enum IntoIter<'tx, K, Key, Value>
where
K: TransactionKind,
Key: TableObject,
@@ -526,7 +554,7 @@ where
/// fails for some reason.
Ok {
/// The MDBX cursor with which to iterate.
cursor: Cursor<K>,
cursor: Cursor<'tx, K>,
/// The first operation to perform when the consumer calls [`Iter::next()`].
op: ffi::MDBX_cursor_op,
@@ -538,19 +566,19 @@ where
},
}
impl<K, Key, Value> IntoIter<K, Key, Value>
impl<'tx, K, Key, Value> IntoIter<'tx, K, Key, Value>
where
K: TransactionKind,
Key: TableObject,
Value: TableObject,
{
/// Creates a new iterator backed by the given cursor.
fn new(cursor: Cursor<K>, op: ffi::MDBX_cursor_op, next_op: ffi::MDBX_cursor_op) -> Self {
fn new(cursor: Cursor<'tx, K>, op: ffi::MDBX_cursor_op, next_op: ffi::MDBX_cursor_op) -> Self {
Self::Ok { cursor, op, next_op, _marker: Default::default() }
}
}
impl<K, Key, Value> Iterator for IntoIter<K, Key, Value>
impl<K, Key, Value> Iterator for IntoIter<'_, K, Key, Value>
where
K: TransactionKind,
Key: TableObject,
@@ -598,7 +626,7 @@ where
/// An iterator over the key/value pairs in an MDBX database.
#[derive(Debug)]
pub enum Iter<'cur, K, Key, Value>
pub enum Iter<'cur, 'tx, K, Key, Value>
where
K: TransactionKind,
Key: TableObject,
@@ -617,7 +645,7 @@ where
/// fails for some reason.
Ok {
/// The MDBX cursor with which to iterate.
cursor: &'cur mut Cursor<K>,
cursor: &'cur mut Cursor<'tx, K>,
/// The first operation to perform when the consumer calls [`Iter::next()`].
op: ffi::MDBX_cursor_op,
@@ -629,7 +657,7 @@ where
},
}
impl<'cur, K, Key, Value> Iter<'cur, K, Key, Value>
impl<'cur, 'tx, K, Key, Value> Iter<'cur, 'tx, K, Key, Value>
where
K: TransactionKind,
Key: TableObject,
@@ -637,7 +665,7 @@ where
{
/// Creates a new iterator backed by the given cursor.
fn new(
cursor: &'cur mut Cursor<K>,
cursor: &'cur mut Cursor<'tx, K>,
op: ffi::MDBX_cursor_op,
next_op: ffi::MDBX_cursor_op,
) -> Self {
@@ -645,7 +673,7 @@ where
}
}
impl<K, Key, Value> Iterator for Iter<'_, K, Key, Value>
impl<K, Key, Value> Iterator for Iter<'_, '_, K, Key, Value>
where
K: TransactionKind,
Key: TableObject,
@@ -695,7 +723,7 @@ where
///
/// The yielded items of the iterator are themselves iterators over the duplicate values for a
/// specific key.
pub enum IterDup<'cur, K, Key, Value>
pub enum IterDup<'cur, 'tx, K, Key, Value>
where
K: TransactionKind,
Key: TableObject,
@@ -714,7 +742,7 @@ where
/// fails for some reason.
Ok {
/// The MDBX cursor with which to iterate.
cursor: &'cur mut Cursor<K>,
cursor: &'cur mut Cursor<'tx, K>,
/// The first operation to perform when the consumer calls `Iter.next()`.
op: MDBX_cursor_op,
@@ -723,19 +751,19 @@ where
},
}
impl<'cur, K, Key, Value> IterDup<'cur, K, Key, Value>
impl<'cur, 'tx, K, Key, Value> IterDup<'cur, 'tx, K, Key, Value>
where
K: TransactionKind,
Key: TableObject,
Value: TableObject,
{
/// Creates a new iterator backed by the given cursor.
fn new(cursor: &'cur mut Cursor<K>, op: MDBX_cursor_op) -> Self {
fn new(cursor: &'cur mut Cursor<'tx, K>, op: MDBX_cursor_op) -> Self {
IterDup::Ok { cursor, op, _marker: Default::default() }
}
}
impl<K, Key, Value> fmt::Debug for IterDup<'_, K, Key, Value>
impl<K, Key, Value> fmt::Debug for IterDup<'_, '_, K, Key, Value>
where
K: TransactionKind,
Key: TableObject,
@@ -746,13 +774,13 @@ where
}
}
impl<K, Key, Value> Iterator for IterDup<'_, K, Key, Value>
impl<'tx, K, Key, Value> Iterator for IterDup<'_, 'tx, K, Key, Value>
where
K: TransactionKind,
Key: TableObject,
Value: TableObject,
{
type Item = IntoIter<K, Key, Value>;
type Item = IntoIter<'tx, K, Key, Value>;
fn next(&mut self) -> Option<Self::Item> {
match self {
@@ -767,7 +795,7 @@ where
(err_code == ffi::MDBX_SUCCESS).then(|| {
IntoIter::new(
Cursor::new_at_position(&**cursor).unwrap(),
Cursor::new_at_position(cursor).unwrap(),
ffi::MDBX_GET_CURRENT,
ffi::MDBX_NEXT_DUP,
)

View File

@@ -13,7 +13,7 @@ pub extern crate reth_mdbx_sys as ffi;
pub use crate::{
codec::*,
cursor::{Cursor, Iter, IterDup},
cursor::{Cursor, IntoIter, Iter, IterDup},
database::Database,
environment::{
Environment, EnvironmentBuilder, EnvironmentKind, Geometry, HandleSlowReadersCallback,

View File

@@ -253,13 +253,13 @@ where
}
/// Open a new cursor on the given database.
pub fn cursor(&self, dbi: ffi::MDBX_dbi) -> Result<Cursor<K>> {
Cursor::new(self.clone(), dbi)
pub fn cursor(&self, dbi: ffi::MDBX_dbi) -> Result<Cursor<'_, K>> {
Cursor::new(self, dbi)
}
/// Open a new cursor on the given dbi.
pub fn cursor_with_dbi(&self, dbi: ffi::MDBX_dbi) -> Result<Cursor<K>> {
Cursor::new(self.clone(), dbi)
pub fn cursor_with_dbi(&self, dbi: ffi::MDBX_dbi) -> Result<Cursor<'_, K>> {
Cursor::new(self, dbi)
}
/// Disables a timeout for this read transaction.

View File

@@ -37,27 +37,30 @@ use reth_storage_errors::provider::ProviderResult;
use strum::{Display, EnumIs};
/// Type alias for [`EitherReader`] constructors.
type EitherReaderTy<'a, P, T> =
EitherReader<'a, CursorTy<<P as DBProvider>::Tx, T>, <P as NodePrimitivesProvider>::Primitives>;
type EitherReaderTy<'a, P, T> = EitherReader<
'a,
CursorTy<'a, <P as DBProvider>::Tx, T>,
<P as NodePrimitivesProvider>::Primitives,
>;
/// Type alias for [`EitherReader`] constructors.
type DupEitherReaderTy<'a, P, T> = EitherReader<
'a,
DupCursorTy<<P as DBProvider>::Tx, T>,
DupCursorTy<'a, <P as DBProvider>::Tx, T>,
<P as NodePrimitivesProvider>::Primitives,
>;
/// Type alias for dup [`EitherWriter`] constructors.
type DupEitherWriterTy<'a, P, T> = EitherWriter<
'a,
DupCursorMutTy<<P as DBProvider>::Tx, T>,
DupCursorMutTy<'a, <P as DBProvider>::Tx, T>,
<P as NodePrimitivesProvider>::Primitives,
>;
/// Type alias for [`EitherWriter`] constructors.
type EitherWriterTy<'a, P, T> = EitherWriter<
'a,
CursorMutTy<<P as DBProvider>::Tx, T>,
CursorMutTy<'a, <P as DBProvider>::Tx, T>,
<P as NodePrimitivesProvider>::Primitives,
>;
@@ -247,7 +250,7 @@ impl<'a> EitherWriter<'a, (), ()> {
/// Creates a new [`EitherWriter`] for storages history based on storage settings.
pub fn new_storages_history<P>(
provider: &P,
provider: &'a P,
_rocksdb_batch: RocksBatchArg<'a>,
) -> ProviderResult<EitherWriterTy<'a, P, tables::StoragesHistory>>
where
@@ -264,7 +267,7 @@ impl<'a> EitherWriter<'a, (), ()> {
/// Creates a new [`EitherWriter`] for transaction hash numbers based on storage settings.
pub fn new_transaction_hash_numbers<P>(
provider: &P,
provider: &'a P,
_rocksdb_batch: RocksBatchArg<'a>,
) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionHashNumbers>>
where
@@ -283,7 +286,7 @@ impl<'a> EitherWriter<'a, (), ()> {
/// Creates a new [`EitherWriter`] for account history based on storage settings.
pub fn new_accounts_history<P>(
provider: &P,
provider: &'a P,
_rocksdb_batch: RocksBatchArg<'a>,
) -> ProviderResult<EitherWriterTy<'a, P, tables::AccountsHistory>>
where
@@ -738,7 +741,7 @@ pub enum EitherReader<'a, CURSOR, N> {
impl<'a> EitherReader<'a, (), ()> {
/// Creates a new [`EitherReader`] for senders based on storage settings.
pub fn new_senders<P>(
provider: &P,
provider: &'a P,
) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionSenders>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
@@ -756,7 +759,7 @@ impl<'a> EitherReader<'a, (), ()> {
/// Creates a new [`EitherReader`] for storages history based on storage settings.
pub fn new_storages_history<P>(
provider: &P,
provider: &'a P,
_rocksdb_tx: RocksTxRefArg<'a>,
) -> ProviderResult<EitherReaderTy<'a, P, tables::StoragesHistory>>
where
@@ -778,7 +781,7 @@ impl<'a> EitherReader<'a, (), ()> {
/// Creates a new [`EitherReader`] for transaction hash numbers based on storage settings.
pub fn new_transaction_hash_numbers<P>(
provider: &P,
provider: &'a P,
_rocksdb_tx: RocksTxRefArg<'a>,
) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionHashNumbers>>
where
@@ -800,7 +803,7 @@ impl<'a> EitherReader<'a, (), ()> {
/// Creates a new [`EitherReader`] for account history based on storage settings.
pub fn new_accounts_history<P>(
provider: &P,
provider: &'a P,
_rocksdb_tx: RocksTxRefArg<'a>,
) -> ProviderResult<EitherReaderTy<'a, P, tables::AccountsHistory>>
where
@@ -822,7 +825,7 @@ impl<'a> EitherReader<'a, (), ()> {
/// Creates a new [`EitherReader`] for account changesets based on storage settings.
pub fn new_account_changesets<P>(
provider: &P,
provider: &'a P,
) -> ProviderResult<DupEitherReaderTy<'a, P, tables::AccountChangeSets>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,

View File

@@ -4008,57 +4008,59 @@ mod tests {
assert_eq!(num_entries, 5);
// Verify account trie updates were written correctly
let tx = provider_rw.tx_ref();
let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
{
let tx = provider_rw.tx_ref();
let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
// Check first account node was updated
let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
let entry1 = cursor.seek_exact(nibbles1).unwrap();
assert!(entry1.is_some(), "Updated account node should exist");
let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
assert_eq!(
entry1.unwrap().1.state_mask,
expected_mask,
"Account node should have updated state_mask"
);
// Check first account node was updated
let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
let entry1 = cursor.seek_exact(nibbles1).unwrap();
assert!(entry1.is_some(), "Updated account node should exist");
let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
assert_eq!(
entry1.unwrap().1.state_mask,
expected_mask,
"Account node should have updated state_mask"
);
// Check deleted account node no longer exists
let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
let entry2 = cursor.seek_exact(nibbles2).unwrap();
assert!(entry2.is_none(), "Deleted account node should not exist");
// Check deleted account node no longer exists
let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
let entry2 = cursor.seek_exact(nibbles2).unwrap();
assert!(entry2.is_none(), "Deleted account node should not exist");
// Check new account node exists
let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
let entry3 = cursor.seek_exact(nibbles3).unwrap();
assert!(entry3.is_some(), "New account node should exist");
// Check new account node exists
let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
let entry3 = cursor.seek_exact(nibbles3).unwrap();
assert!(entry3.is_some(), "New account node should exist");
// Verify storage trie updates were written correctly
let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
// Verify storage trie updates were written correctly
let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
// Check storage for address1
let storage_entries1: Vec<_> = storage_cursor
.walk_dup(Some(storage_address1), None)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(
storage_entries1.len(),
1,
"Storage address1 should have 1 entry after deletion"
);
assert_eq!(
storage_entries1[0].1.nibbles.0,
Nibbles::from_nibbles([0x1, 0x0]),
"Remaining entry should be [0x1, 0x0]"
);
// Check storage for address1
let storage_entries1: Vec<_> = storage_cursor
.walk_dup(Some(storage_address1), None)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(
storage_entries1.len(),
1,
"Storage address1 should have 1 entry after deletion"
);
assert_eq!(
storage_entries1[0].1.nibbles.0,
Nibbles::from_nibbles([0x1, 0x0]),
"Remaining entry should be [0x1, 0x0]"
);
// Check storage for address2 was wiped
let storage_entries2: Vec<_> = storage_cursor
.walk_dup(Some(storage_address2), None)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
// Check storage for address2 was wiped
let storage_entries2: Vec<_> = storage_cursor
.walk_dup(Some(storage_address2), None)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
}
provider_rw.commit().unwrap();
}

View File

@@ -2,7 +2,7 @@ use alloy_primitives::{BlockNumber, B256};
use metrics::{Counter, Histogram};
use parking_lot::RwLock;
use reth_chain_state::LazyOverlay;
use reth_db_api::DatabaseError;
use reth_db_api::{tables, transaction::DbTx, DatabaseError};
use reth_errors::{ProviderError, ProviderResult};
use reth_metrics::Metrics;
use reth_prune_types::PruneSegment;
@@ -13,13 +13,15 @@ use reth_storage_api::{
StorageChangeSetReader,
};
use reth_trie::{
hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory},
hashed_cursor::{HashedCursorFactory, HashedPostStateCursor, HashedPostStateCursorFactory},
trie_cursor::{InMemoryTrieCursor, InMemoryTrieCursorFactory, TrieCursorFactory},
updates::TrieUpdatesSorted,
HashedPostStateSorted, KeccakKeyHasher,
};
use reth_trie_db::{
ChangesetCache, DatabaseHashedCursorFactory, DatabaseHashedPostState, DatabaseTrieCursorFactory,
ChangesetCache, DatabaseAccountTrieCursor, DatabaseHashedAccountCursor,
DatabaseHashedCursorFactory, DatabaseHashedPostState, DatabaseHashedStorageCursor,
DatabaseStorageTrieCursor, DatabaseTrieCursorFactory,
};
use std::{
collections::{hash_map::Entry, HashMap},
@@ -527,20 +529,21 @@ where
Self: 'a;
fn account_trie_cursor(&self) -> Result<Self::AccountTrieCursor<'_>, DatabaseError> {
let db_trie_cursor_factory = DatabaseTrieCursorFactory::new(self.provider.tx_ref());
let trie_cursor_factory =
InMemoryTrieCursorFactory::new(db_trie_cursor_factory, self.trie_updates.as_ref());
trie_cursor_factory.account_trie_cursor()
let db_cursor = DatabaseAccountTrieCursor::new(
self.provider.tx_ref().cursor_read::<tables::AccountsTrie>()?,
);
Ok(InMemoryTrieCursor::new_account(db_cursor, self.trie_updates.as_ref()))
}
fn storage_trie_cursor(
&self,
hashed_address: B256,
) -> Result<Self::StorageTrieCursor<'_>, DatabaseError> {
let db_trie_cursor_factory = DatabaseTrieCursorFactory::new(self.provider.tx_ref());
let trie_cursor_factory =
InMemoryTrieCursorFactory::new(db_trie_cursor_factory, self.trie_updates.as_ref());
trie_cursor_factory.storage_trie_cursor(hashed_address)
let db_cursor = DatabaseStorageTrieCursor::new(
self.provider.tx_ref().cursor_dup_read::<tables::StoragesTrie>()?,
hashed_address,
);
Ok(InMemoryTrieCursor::new_storage(db_cursor, self.trie_updates.as_ref(), hashed_address))
}
}
@@ -565,19 +568,24 @@ where
Self: 'a;
fn hashed_account_cursor(&self) -> Result<Self::AccountCursor<'_>, DatabaseError> {
let db_hashed_cursor_factory = DatabaseHashedCursorFactory::new(self.provider.tx_ref());
let hashed_cursor_factory =
HashedPostStateCursorFactory::new(db_hashed_cursor_factory, &self.hashed_post_state);
hashed_cursor_factory.hashed_account_cursor()
let db_cursor = DatabaseHashedAccountCursor::new(
self.provider.tx_ref().cursor_read::<tables::HashedAccounts>()?,
);
Ok(HashedPostStateCursor::new_account(db_cursor, self.hashed_post_state.as_ref()))
}
fn hashed_storage_cursor(
&self,
hashed_address: B256,
) -> Result<Self::StorageCursor<'_>, DatabaseError> {
let db_hashed_cursor_factory = DatabaseHashedCursorFactory::new(self.provider.tx_ref());
let hashed_cursor_factory =
HashedPostStateCursorFactory::new(db_hashed_cursor_factory, &self.hashed_post_state);
hashed_cursor_factory.hashed_storage_cursor(hashed_address)
let db_cursor = DatabaseHashedStorageCursor::new(
self.provider.tx_ref().cursor_dup_read::<tables::HashedStorages>()?,
hashed_address,
);
Ok(HashedPostStateCursor::new_storage(
db_cursor,
self.hashed_post_state.as_ref(),
hashed_address,
))
}
}

View File

@@ -21,11 +21,11 @@ impl<T> DatabaseHashedCursorFactory<T> {
impl<TX: DbTx> HashedCursorFactory for DatabaseHashedCursorFactory<&TX> {
type AccountCursor<'a>
= DatabaseHashedAccountCursor<<TX as DbTx>::Cursor<tables::HashedAccounts>>
= DatabaseHashedAccountCursor<<TX as DbTx>::Cursor<'a, tables::HashedAccounts>>
where
Self: 'a;
type StorageCursor<'a>
= DatabaseHashedStorageCursor<<TX as DbTx>::DupCursor<tables::HashedStorages>>
= DatabaseHashedStorageCursor<<TX as DbTx>::DupCursor<'a, tables::HashedStorages>>
where
Self: 'a;

View File

@@ -27,12 +27,12 @@ where
TX: DbTx,
{
type AccountTrieCursor<'a>
= DatabaseAccountTrieCursor<<TX as DbTx>::Cursor<tables::AccountsTrie>>
= DatabaseAccountTrieCursor<<TX as DbTx>::Cursor<'a, tables::AccountsTrie>>
where
Self: 'a;
type StorageTrieCursor<'a>
= DatabaseStorageTrieCursor<<TX as DbTx>::DupCursor<tables::StoragesTrie>>
= DatabaseStorageTrieCursor<<TX as DbTx>::DupCursor<'a, tables::StoragesTrie>>
where
Self: 'a;
@@ -64,7 +64,7 @@ impl<C> DatabaseAccountTrieCursor<C> {
impl<C> TrieCursor for DatabaseAccountTrieCursor<C>
where
C: DbCursorRO<tables::AccountsTrie> + Send,
C: DbCursorRO<tables::AccountsTrie>,
{
/// Seeks an exact match for the provided key in the account trie.
fn seek_exact(
@@ -160,7 +160,7 @@ where
impl<C> TrieCursor for DatabaseStorageTrieCursor<C>
where
C: DbCursorRO<tables::StoragesTrie> + DbDupCursorRO<tables::StoragesTrie> + Send,
C: DbCursorRO<tables::StoragesTrie> + DbDupCursorRO<tables::StoragesTrie>,
{
/// Seeks an exact match for the given key in the storage trie.
fn seek_exact(
@@ -202,7 +202,7 @@ where
impl<C> TrieStorageCursor for DatabaseStorageTrieCursor<C>
where
C: DbCursorRO<tables::StoragesTrie> + DbDupCursorRO<tables::StoragesTrie> + Send,
C: DbCursorRO<tables::StoragesTrie> + DbDupCursorRO<tables::StoragesTrie>,
{
fn set_hashed_address(&mut self, hashed_address: B256) {
self.hashed_address = hashed_address;