mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
8 Commits
push
...
tempo/curs
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aa087ac553 | ||
|
|
ede72ca909 | ||
|
|
d17f99330d | ||
|
|
747a6409c1 | ||
|
|
abccf7907d | ||
|
|
b1ee1f812d | ||
|
|
8f20c328ff | ||
|
|
103588893d |
@@ -199,8 +199,8 @@ fn verify_checkpoints(provider: impl StageCheckpointReader) -> eyre::Result<()>
|
||||
}
|
||||
|
||||
fn verify_and_repair<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()> {
|
||||
// Get a read-write database provider
|
||||
let mut provider_rw = tool.provider_factory.provider_rw()?;
|
||||
// Get a read-write database provider for checkpoints and committing
|
||||
let provider_rw = tool.provider_factory.provider_rw()?;
|
||||
|
||||
// Log the database block tip from Finish stage checkpoint
|
||||
let finish_checkpoint = provider_rw.get_stage_checkpoint(StageId::Finish)?.unwrap_or_default();
|
||||
@@ -209,17 +209,21 @@ fn verify_and_repair<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()>
|
||||
// Check that a pipeline sync isn't in progress.
|
||||
verify_checkpoints(provider_rw.as_ref())?;
|
||||
|
||||
// Create cursors for making modifications with
|
||||
let tx = provider_rw.tx_mut();
|
||||
tx.disable_long_read_transaction_safety();
|
||||
let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
|
||||
let mut storage_trie_cursor = tx.cursor_dup_write::<tables::StoragesTrie>()?;
|
||||
// Get the database reference for creating transactions
|
||||
let db = tool.provider_factory.db_ref();
|
||||
|
||||
// Create the cursor factories. These cannot accept the `&mut` tx above because they require it
|
||||
// to be AsRef.
|
||||
let tx = provider_rw.tx_ref();
|
||||
let hashed_cursor_factory = DatabaseHashedCursorFactory::new(tx);
|
||||
let trie_cursor_factory = DatabaseTrieCursorFactory::new(tx);
|
||||
// Create a read-only transaction for the Verifier
|
||||
let mut read_tx = db.tx()?;
|
||||
read_tx.disable_long_read_transaction_safety();
|
||||
|
||||
// Create a read-write transaction for the write cursors
|
||||
let write_tx = db.tx_mut()?;
|
||||
let mut account_trie_cursor = write_tx.cursor_write::<tables::AccountsTrie>()?;
|
||||
let mut storage_trie_cursor = write_tx.cursor_dup_write::<tables::StoragesTrie>()?;
|
||||
|
||||
// Create the cursor factories using the read-only transaction
|
||||
let hashed_cursor_factory = DatabaseHashedCursorFactory::new(&read_tx);
|
||||
let trie_cursor_factory = DatabaseTrieCursorFactory::new(&read_tx);
|
||||
|
||||
// Create the verifier
|
||||
let verifier = Verifier::new(&trie_cursor_factory, hashed_cursor_factory)?;
|
||||
@@ -304,10 +308,15 @@ fn verify_and_repair<N: ProviderNodeTypes>(tool: &DbTool<N>) -> eyre::Result<()>
|
||||
}
|
||||
}
|
||||
|
||||
// Drop read transaction and cursors before committing
|
||||
drop(account_trie_cursor);
|
||||
drop(storage_trie_cursor);
|
||||
drop(read_tx);
|
||||
|
||||
if inconsistent_nodes == 0 {
|
||||
info!("No inconsistencies found");
|
||||
} else {
|
||||
provider_rw.commit()?;
|
||||
write_tx.commit()?;
|
||||
info!("Repaired {} inconsistencies and committed changes", inconsistent_nodes);
|
||||
}
|
||||
|
||||
|
||||
@@ -116,7 +116,7 @@ pub(crate) trait DbTxPruneExt: DbTxMut + DbTx {
|
||||
/// timing out.
|
||||
fn prune_table_with_range_step<T: Table>(
|
||||
&self,
|
||||
walker: &mut RangeWalker<'_, T, Self::CursorMut<T>>,
|
||||
walker: &mut RangeWalker<'_, T, Self::CursorMut<'_, T>>,
|
||||
limiter: &mut PruneLimiter,
|
||||
skip_filter: &mut impl FnMut(&TableRow<T>) -> bool,
|
||||
delete_callback: &mut impl FnMut(TableRow<T>),
|
||||
|
||||
@@ -66,8 +66,14 @@ pub struct TxMock {
|
||||
}
|
||||
|
||||
impl DbTx for TxMock {
|
||||
type Cursor<T: Table> = CursorMock;
|
||||
type DupCursor<T: DupSort> = CursorMock;
|
||||
type Cursor<'tx, T: Table>
|
||||
= CursorMock
|
||||
where
|
||||
Self: 'tx;
|
||||
type DupCursor<'tx, T: DupSort>
|
||||
= CursorMock
|
||||
where
|
||||
Self: 'tx;
|
||||
|
||||
/// Retrieves a value by key from the specified table.
|
||||
///
|
||||
@@ -108,7 +114,7 @@ impl DbTx for TxMock {
|
||||
///
|
||||
/// **Mock behavior**: Returns a default [`CursorMock`] that will not
|
||||
/// iterate over any data (all cursor operations return `None`).
|
||||
fn cursor_read<T: Table>(&self) -> Result<Self::Cursor<T>, DatabaseError> {
|
||||
fn cursor_read<T: Table>(&self) -> Result<Self::Cursor<'_, T>, DatabaseError> {
|
||||
Ok(CursorMock { _cursor: 0 })
|
||||
}
|
||||
|
||||
@@ -116,7 +122,7 @@ impl DbTx for TxMock {
|
||||
///
|
||||
/// **Mock behavior**: Returns a default [`CursorMock`] that will not
|
||||
/// iterate over any data (all cursor operations return `None`).
|
||||
fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<T>, DatabaseError> {
|
||||
fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<'_, T>, DatabaseError> {
|
||||
Ok(CursorMock { _cursor: 0 })
|
||||
}
|
||||
|
||||
@@ -136,8 +142,14 @@ impl DbTx for TxMock {
|
||||
}
|
||||
|
||||
impl DbTxMut for TxMock {
|
||||
type CursorMut<T: Table> = CursorMock;
|
||||
type DupCursorMut<T: DupSort> = CursorMock;
|
||||
type CursorMut<'tx, T: Table>
|
||||
= CursorMock
|
||||
where
|
||||
Self: 'tx;
|
||||
type DupCursorMut<'tx, T: DupSort>
|
||||
= CursorMock
|
||||
where
|
||||
Self: 'tx;
|
||||
|
||||
/// Inserts or updates a key-value pair in the specified table.
|
||||
///
|
||||
@@ -172,7 +184,7 @@ impl DbTxMut for TxMock {
|
||||
///
|
||||
/// **Mock behavior**: Returns a default [`CursorMock`] that will not
|
||||
/// iterate over any data and all write operations will be no-ops.
|
||||
fn cursor_write<T: Table>(&self) -> Result<Self::CursorMut<T>, DatabaseError> {
|
||||
fn cursor_write<T: Table>(&self) -> Result<Self::CursorMut<'_, T>, DatabaseError> {
|
||||
Ok(CursorMock { _cursor: 0 })
|
||||
}
|
||||
|
||||
@@ -180,7 +192,7 @@ impl DbTxMut for TxMock {
|
||||
///
|
||||
/// **Mock behavior**: Returns a default [`CursorMock`] that will not
|
||||
/// iterate over any data and all write operations will be no-ops.
|
||||
fn cursor_dup_write<T: DupSort>(&self) -> Result<Self::DupCursorMut<T>, DatabaseError> {
|
||||
fn cursor_dup_write<T: DupSort>(&self) -> Result<Self::DupCursorMut<'_, T>, DatabaseError> {
|
||||
Ok(CursorMock { _cursor: 0 })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -121,6 +121,17 @@ pub trait Table: Send + Sync + Debug + 'static {
|
||||
/// Whether the table is also a `DUPSORT` table.
|
||||
const DUPSORT: bool;
|
||||
|
||||
/// Unique index of this table for O(1) cache slot access.
|
||||
///
|
||||
/// This is automatically generated by the `tables!` macro based on declaration order.
|
||||
///
|
||||
/// # Stability
|
||||
///
|
||||
/// **This index is NOT stable across versions.** It may change when tables are added,
|
||||
/// removed, or reordered. Use only for runtime operations (e.g., cursor caching),
|
||||
/// never for persistent storage or cross-process communication.
|
||||
const INDEX: usize;
|
||||
|
||||
/// Key element of `Table`.
|
||||
///
|
||||
/// Sorting should be taken into account when encoding this.
|
||||
|
||||
@@ -124,6 +124,8 @@ macro_rules! tables {
|
||||
concat!("`", stringify!($value), "`")
|
||||
};
|
||||
|
||||
|
||||
|
||||
($($(#[$attr:meta])* table $name:ident$(<$($generic:ident $(= $default:ty)?),*>)? { type Key = $key:ty; type Value = $value:ty; $(type SubKey = $subkey:ty;)? } )*) => {
|
||||
// Table marker types.
|
||||
$(
|
||||
@@ -152,6 +154,7 @@ macro_rules! tables {
|
||||
{
|
||||
const NAME: &'static str = table_names::$name;
|
||||
const DUPSORT: bool = tables!(@bool $($subkey)?);
|
||||
const INDEX: usize = Tables::$name.index();
|
||||
|
||||
type Key = $key;
|
||||
type Value = $value;
|
||||
@@ -182,6 +185,30 @@ macro_rules! tables {
|
||||
/// The number of tables in the database.
|
||||
pub const COUNT: usize = Self::ALL.len();
|
||||
|
||||
/// Returns the unique index of this table (0..COUNT).
|
||||
///
|
||||
/// This is used for O(1) cursor cache slot access.
|
||||
///
|
||||
/// # Stability
|
||||
///
|
||||
/// **This index is NOT stable across versions.** It is based on declaration
|
||||
/// order in the `tables!` macro and may change when tables are added, removed,
|
||||
/// or reordered. Use only for runtime operations, never for persistent storage.
|
||||
#[allow(clippy::enum_glob_use)]
|
||||
pub const fn index(&self) -> usize {
|
||||
use Tables::*;
|
||||
// Generate a match with incrementing indices
|
||||
let mut idx = 0usize;
|
||||
$(
|
||||
if matches!(self, $name) {
|
||||
return idx;
|
||||
}
|
||||
idx += 1;
|
||||
)*
|
||||
let _ = idx; // suppress unused warning
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
/// Returns the name of the table as a string.
|
||||
pub const fn name(&self) -> &'static str {
|
||||
match self {
|
||||
|
||||
@@ -17,6 +17,7 @@ pub struct RawTable<T: Table> {
|
||||
impl<T: Table> Table for RawTable<T> {
|
||||
const NAME: &'static str = T::NAME;
|
||||
const DUPSORT: bool = false;
|
||||
const INDEX: usize = T::INDEX;
|
||||
|
||||
type Key = RawKey<T::Key>;
|
||||
type Value = RawValue<T::Value>;
|
||||
@@ -32,6 +33,7 @@ pub struct RawDupSort<T: DupSort> {
|
||||
impl<T: DupSort> Table for RawDupSort<T> {
|
||||
const NAME: &'static str = T::NAME;
|
||||
const DUPSORT: bool = true;
|
||||
const INDEX: usize = T::INDEX;
|
||||
|
||||
type Key = RawKey<T::Key>;
|
||||
type Value = RawValue<T::Value>;
|
||||
|
||||
@@ -6,23 +6,27 @@ use crate::{
|
||||
use std::fmt::Debug;
|
||||
|
||||
/// Helper adapter type for accessing [`DbTx`] cursor.
|
||||
pub type CursorTy<TX, T> = <TX as DbTx>::Cursor<T>;
|
||||
pub type CursorTy<'tx, TX, T> = <TX as DbTx>::Cursor<'tx, T>;
|
||||
|
||||
/// Helper adapter type for accessing [`DbTx`] dup cursor.
|
||||
pub type DupCursorTy<TX, T> = <TX as DbTx>::DupCursor<T>;
|
||||
pub type DupCursorTy<'tx, TX, T> = <TX as DbTx>::DupCursor<'tx, T>;
|
||||
|
||||
/// Helper adapter type for accessing [`DbTxMut`] mutable cursor.
|
||||
pub type CursorMutTy<TX, T> = <TX as DbTxMut>::CursorMut<T>;
|
||||
pub type CursorMutTy<'tx, TX, T> = <TX as DbTxMut>::CursorMut<'tx, T>;
|
||||
|
||||
/// Helper adapter type for accessing [`DbTxMut`] mutable dup cursor.
|
||||
pub type DupCursorMutTy<TX, T> = <TX as DbTxMut>::DupCursorMut<T>;
|
||||
pub type DupCursorMutTy<'tx, TX, T> = <TX as DbTxMut>::DupCursorMut<'tx, T>;
|
||||
|
||||
/// Read only transaction
|
||||
pub trait DbTx: Debug + Send {
|
||||
/// Cursor type for this read-only transaction
|
||||
type Cursor<T: Table>: DbCursorRO<T> + Send;
|
||||
type Cursor<'tx, T: Table>: DbCursorRO<T>
|
||||
where
|
||||
Self: 'tx;
|
||||
/// `DupCursor` type for this read-only transaction
|
||||
type DupCursor<T: DupSort>: DbDupCursorRO<T> + DbCursorRO<T> + Send;
|
||||
type DupCursor<'tx, T: DupSort>: DbDupCursorRO<T> + DbCursorRO<T>
|
||||
where
|
||||
Self: 'tx;
|
||||
|
||||
/// Get value by an owned key
|
||||
fn get<T: Table>(&self, key: T::Key) -> Result<Option<T::Value>, DatabaseError>;
|
||||
@@ -39,9 +43,9 @@ pub trait DbTx: Debug + Send {
|
||||
/// Aborts transaction
|
||||
fn abort(self);
|
||||
/// Iterate over read only values in table.
|
||||
fn cursor_read<T: Table>(&self) -> Result<Self::Cursor<T>, DatabaseError>;
|
||||
fn cursor_read<T: Table>(&self) -> Result<Self::Cursor<'_, T>, DatabaseError>;
|
||||
/// Iterate over read only values in dup sorted table.
|
||||
fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<T>, DatabaseError>;
|
||||
fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<'_, T>, DatabaseError>;
|
||||
/// Returns number of entries in the table.
|
||||
fn entries<T: Table>(&self) -> Result<usize, DatabaseError>;
|
||||
/// Disables long-lived read transaction safety guarantees.
|
||||
@@ -51,13 +55,16 @@ pub trait DbTx: Debug + Send {
|
||||
/// Read write transaction that allows writing to database
|
||||
pub trait DbTxMut: Send {
|
||||
/// Read-Write Cursor type
|
||||
type CursorMut<T: Table>: DbCursorRW<T> + DbCursorRO<T> + Send;
|
||||
type CursorMut<'tx, T: Table>: DbCursorRW<T> + DbCursorRO<T>
|
||||
where
|
||||
Self: 'tx;
|
||||
/// Read-Write `DupCursor` type
|
||||
type DupCursorMut<T: DupSort>: DbDupCursorRW<T>
|
||||
type DupCursorMut<'tx, T: DupSort>: DbDupCursorRW<T>
|
||||
+ DbCursorRW<T>
|
||||
+ DbDupCursorRO<T>
|
||||
+ DbCursorRO<T>
|
||||
+ Send;
|
||||
where
|
||||
Self: 'tx;
|
||||
|
||||
/// Put value to database
|
||||
fn put<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError>;
|
||||
@@ -73,7 +80,7 @@ pub trait DbTxMut: Send {
|
||||
/// Clears database.
|
||||
fn clear<T: Table>(&self) -> Result<(), DatabaseError>;
|
||||
/// Cursor mut
|
||||
fn cursor_write<T: Table>(&self) -> Result<Self::CursorMut<T>, DatabaseError>;
|
||||
fn cursor_write<T: Table>(&self) -> Result<Self::CursorMut<'_, T>, DatabaseError>;
|
||||
/// `DupCursor` mut.
|
||||
fn cursor_dup_write<T: DupSort>(&self) -> Result<Self::DupCursorMut<T>, DatabaseError>;
|
||||
fn cursor_dup_write<T: DupSort>(&self) -> Result<Self::DupCursorMut<'_, T>, DatabaseError>;
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//! Cursor wrapper for libmdbx-sys.
|
||||
|
||||
use super::utils::*;
|
||||
use super::{cursor_cache::CursorCache, utils::*};
|
||||
use crate::{
|
||||
metrics::{DatabaseEnvMetrics, Operation},
|
||||
DatabaseError,
|
||||
@@ -13,34 +13,56 @@ use reth_db_api::{
|
||||
},
|
||||
table::{Compress, Decode, Decompress, DupSort, Encode, IntoVec, Table},
|
||||
};
|
||||
use reth_libmdbx::{Error as MDBXError, TransactionKind, WriteFlags, RO, RW};
|
||||
use reth_libmdbx::{ffi, Error as MDBXError, TransactionKind, WriteFlags, RO, RW};
|
||||
use reth_storage_errors::db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation};
|
||||
use std::{borrow::Cow, collections::Bound, marker::PhantomData, ops::RangeBounds, sync::Arc};
|
||||
|
||||
/// Read only Cursor.
|
||||
pub type CursorRO<T> = Cursor<RO, T>;
|
||||
pub type CursorRO<'tx, T> = Cursor<'tx, RO, T>;
|
||||
/// Read write cursor.
|
||||
pub type CursorRW<T> = Cursor<RW, T>;
|
||||
pub type CursorRW<'tx, T> = Cursor<'tx, RW, T>;
|
||||
|
||||
/// Cursor wrapper to access KV items.
|
||||
///
|
||||
/// When dropped, the cursor is returned to the transaction's cursor cache for reuse,
|
||||
/// rather than being closed. This reduces the overhead of cursor creation in
|
||||
/// cursor-heavy workloads.
|
||||
#[derive(Debug)]
|
||||
pub struct Cursor<K: TransactionKind, T: Table> {
|
||||
/// Inner `libmdbx` cursor.
|
||||
pub(crate) inner: reth_libmdbx::Cursor<K>,
|
||||
pub struct Cursor<'tx, K: TransactionKind, T: Table> {
|
||||
/// Inner `libmdbx` cursor. Wrapped in Option so we can take it in Drop.
|
||||
pub(crate) inner: Option<reth_libmdbx::Cursor<'tx, K>>,
|
||||
/// Cache buffer that receives compressed values.
|
||||
buf: Vec<u8>,
|
||||
/// Reference to metric handles in the DB environment. If `None`, metrics are not recorded.
|
||||
metrics: Option<Arc<DatabaseEnvMetrics>>,
|
||||
/// Reference to the cursor cache where this cursor should be returned on drop.
|
||||
/// If `None`, the cursor is closed normally on drop.
|
||||
cache: Option<&'tx CursorCache>,
|
||||
/// Phantom data to enforce encoding/decoding.
|
||||
_dbi: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<K: TransactionKind, T: Table> Cursor<K, T> {
|
||||
pub(crate) const fn new_with_metrics(
|
||||
inner: reth_libmdbx::Cursor<K>,
|
||||
#[expect(clippy::missing_const_for_fn)]
|
||||
impl<'tx, K: TransactionKind, T: Table> Cursor<'tx, K, T> {
|
||||
pub(crate) fn new_with_metrics(
|
||||
inner: reth_libmdbx::Cursor<'tx, K>,
|
||||
metrics: Option<Arc<DatabaseEnvMetrics>>,
|
||||
) -> Self {
|
||||
Self { inner, buf: Vec::new(), metrics, _dbi: PhantomData }
|
||||
Self { inner: Some(inner), buf: Vec::new(), metrics, cache: None, _dbi: PhantomData }
|
||||
}
|
||||
|
||||
pub(crate) fn new_with_cache(
|
||||
inner: reth_libmdbx::Cursor<'tx, K>,
|
||||
metrics: Option<Arc<DatabaseEnvMetrics>>,
|
||||
cache: &'tx CursorCache,
|
||||
) -> Self {
|
||||
Self { inner: Some(inner), buf: Vec::new(), metrics, cache: Some(cache), _dbi: PhantomData }
|
||||
}
|
||||
|
||||
/// Returns a mutable reference to the inner cursor.
|
||||
#[inline]
|
||||
fn inner_mut(&mut self) -> &mut reth_libmdbx::Cursor<'tx, K> {
|
||||
self.inner.as_mut().expect("cursor already taken")
|
||||
}
|
||||
|
||||
/// If `self.metrics` is `Some(...)`, record a metric with the provided operation and value
|
||||
@@ -61,6 +83,25 @@ impl<K: TransactionKind, T: Table> Cursor<K, T> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<K: TransactionKind, T: Table> Drop for Cursor<'_, K, T> {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.inner.take() &&
|
||||
let Some(cache) = self.cache
|
||||
{
|
||||
// Return cursor to cache for reuse
|
||||
let raw = inner.into_raw();
|
||||
let old = cache.put(T::INDEX, raw);
|
||||
// If there was already a cursor in the slot (shouldn't happen), close it
|
||||
if let Some(old_cursor) = old {
|
||||
unsafe {
|
||||
ffi::mdbx_cursor_close(old_cursor);
|
||||
}
|
||||
}
|
||||
}
|
||||
// If no cache or no inner, the cursor drops normally and closes itself
|
||||
}
|
||||
}
|
||||
|
||||
/// Decodes a `(key, value)` pair from the database.
|
||||
#[expect(clippy::type_complexity)]
|
||||
pub fn decode<T>(
|
||||
@@ -88,38 +129,38 @@ macro_rules! compress_to_buf_or_ref {
|
||||
};
|
||||
}
|
||||
|
||||
impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
|
||||
impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<'_, K, T> {
|
||||
fn first(&mut self) -> PairResult<T> {
|
||||
decode::<T>(self.inner.first())
|
||||
decode::<T>(self.inner_mut().first())
|
||||
}
|
||||
|
||||
fn seek_exact(&mut self, key: <T as Table>::Key) -> PairResult<T> {
|
||||
decode::<T>(self.inner.set_key(key.encode().as_ref()))
|
||||
decode::<T>(self.inner_mut().set_key(key.encode().as_ref()))
|
||||
}
|
||||
|
||||
fn seek(&mut self, key: <T as Table>::Key) -> PairResult<T> {
|
||||
decode::<T>(self.inner.set_range(key.encode().as_ref()))
|
||||
decode::<T>(self.inner_mut().set_range(key.encode().as_ref()))
|
||||
}
|
||||
|
||||
fn next(&mut self) -> PairResult<T> {
|
||||
decode::<T>(self.inner.next())
|
||||
decode::<T>(self.inner_mut().next())
|
||||
}
|
||||
|
||||
fn prev(&mut self) -> PairResult<T> {
|
||||
decode::<T>(self.inner.prev())
|
||||
decode::<T>(self.inner_mut().prev())
|
||||
}
|
||||
|
||||
fn last(&mut self) -> PairResult<T> {
|
||||
decode::<T>(self.inner.last())
|
||||
decode::<T>(self.inner_mut().last())
|
||||
}
|
||||
|
||||
fn current(&mut self) -> PairResult<T> {
|
||||
decode::<T>(self.inner.get_current())
|
||||
decode::<T>(self.inner_mut().get_current())
|
||||
}
|
||||
|
||||
fn walk(&mut self, start_key: Option<T::Key>) -> Result<Walker<'_, T, Self>, DatabaseError> {
|
||||
let start = if let Some(start_key) = start_key {
|
||||
decode::<T>(self.inner.set_range(start_key.encode().as_ref())).transpose()
|
||||
decode::<T>(self.inner_mut().set_range(start_key.encode().as_ref())).transpose()
|
||||
} else {
|
||||
self.first().transpose()
|
||||
};
|
||||
@@ -132,11 +173,11 @@ impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
|
||||
range: impl RangeBounds<T::Key>,
|
||||
) -> Result<RangeWalker<'_, T, Self>, DatabaseError> {
|
||||
let start = match range.start_bound().cloned() {
|
||||
Bound::Included(key) => self.inner.set_range(key.encode().as_ref()),
|
||||
Bound::Included(key) => self.inner_mut().set_range(key.encode().as_ref()),
|
||||
Bound::Excluded(_key) => {
|
||||
unreachable!("Rust doesn't allow for Bound::Excluded in starting bounds");
|
||||
}
|
||||
Bound::Unbounded => self.inner.first(),
|
||||
Bound::Unbounded => self.inner_mut().first(),
|
||||
};
|
||||
let start = decode::<T>(start).transpose();
|
||||
Ok(RangeWalker::new(self, start, range.end_bound().cloned()))
|
||||
@@ -147,7 +188,7 @@ impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
|
||||
start_key: Option<T::Key>,
|
||||
) -> Result<ReverseWalker<'_, T, Self>, DatabaseError> {
|
||||
let start = if let Some(start_key) = start_key {
|
||||
decode::<T>(self.inner.set_range(start_key.encode().as_ref()))
|
||||
decode::<T>(self.inner_mut().set_range(start_key.encode().as_ref()))
|
||||
} else {
|
||||
self.last()
|
||||
}
|
||||
@@ -157,20 +198,20 @@ impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
|
||||
impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<'_, K, T> {
|
||||
/// Returns the previous `(key, value)` pair of a DUPSORT table.
|
||||
fn prev_dup(&mut self) -> PairResult<T> {
|
||||
decode::<T>(self.inner.prev_dup())
|
||||
decode::<T>(self.inner_mut().prev_dup())
|
||||
}
|
||||
|
||||
/// Returns the next `(key, value)` pair of a DUPSORT table.
|
||||
fn next_dup(&mut self) -> PairResult<T> {
|
||||
decode::<T>(self.inner.next_dup())
|
||||
decode::<T>(self.inner_mut().next_dup())
|
||||
}
|
||||
|
||||
/// Returns the last `value` of the current duplicate `key`.
|
||||
fn last_dup(&mut self) -> ValueOnlyResult<T> {
|
||||
self.inner
|
||||
self.inner_mut()
|
||||
.last_dup()
|
||||
.map_err(|e| DatabaseError::Read(e.into()))?
|
||||
.map(decode_one::<T>)
|
||||
@@ -179,12 +220,12 @@ impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
|
||||
|
||||
/// Returns the next `(key, value)` pair skipping the duplicates.
|
||||
fn next_no_dup(&mut self) -> PairResult<T> {
|
||||
decode::<T>(self.inner.next_nodup())
|
||||
decode::<T>(self.inner_mut().next_nodup())
|
||||
}
|
||||
|
||||
/// Returns the next `value` of a duplicate `key`.
|
||||
fn next_dup_val(&mut self) -> ValueOnlyResult<T> {
|
||||
self.inner
|
||||
self.inner_mut()
|
||||
.next_dup()
|
||||
.map_err(|e| DatabaseError::Read(e.into()))?
|
||||
.map(decode_value::<T>)
|
||||
@@ -196,7 +237,7 @@ impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
|
||||
key: <T as Table>::Key,
|
||||
subkey: <T as DupSort>::SubKey,
|
||||
) -> ValueOnlyResult<T> {
|
||||
self.inner
|
||||
self.inner_mut()
|
||||
.get_both_range(key.encode().as_ref(), subkey.encode().as_ref())
|
||||
.map_err(|e| DatabaseError::Read(e.into()))?
|
||||
.map(decode_one::<T>)
|
||||
@@ -216,14 +257,14 @@ impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
|
||||
let start = match (key, subkey) {
|
||||
(Some(key), Some(subkey)) => {
|
||||
let encoded_key = key.encode();
|
||||
self.inner
|
||||
self.inner_mut()
|
||||
.get_both_range(encoded_key.as_ref(), subkey.encode().as_ref())
|
||||
.map_err(|e| DatabaseError::Read(e.into()))?
|
||||
.map(|val| decoder::<T>((Cow::Borrowed(encoded_key.as_ref()), val)))
|
||||
}
|
||||
(Some(key), None) => {
|
||||
let encoded_key = key.encode();
|
||||
self.inner
|
||||
self.inner_mut()
|
||||
.set(encoded_key.as_ref())
|
||||
.map_err(|e| DatabaseError::Read(e.into()))?
|
||||
.map(|val| decoder::<T>((Cow::Borrowed(encoded_key.as_ref()), val)))
|
||||
@@ -231,7 +272,7 @@ impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
|
||||
(None, Some(subkey)) => {
|
||||
if let Some((key, _)) = self.first()? {
|
||||
let encoded_key = key.encode();
|
||||
self.inner
|
||||
self.inner_mut()
|
||||
.get_both_range(encoded_key.as_ref(), subkey.encode().as_ref())
|
||||
.map_err(|e| DatabaseError::Read(e.into()))?
|
||||
.map(|val| decoder::<T>((Cow::Borrowed(encoded_key.as_ref()), val)))
|
||||
@@ -246,7 +287,7 @@ impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Table> DbCursorRW<T> for Cursor<RW, T> {
|
||||
impl<T: Table> DbCursorRW<T> for Cursor<'_, RW, T> {
|
||||
/// Database operation that will update an existing row if a specified value already
|
||||
/// exists in a table, and insert a new row if the specified value doesn't already exist
|
||||
///
|
||||
@@ -261,17 +302,20 @@ impl<T: Table> DbCursorRW<T> for Cursor<RW, T> {
|
||||
Operation::CursorUpsert,
|
||||
Some(value.unwrap_or(&self.buf).len()),
|
||||
|this| {
|
||||
this.inner
|
||||
.put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::UPSERT)
|
||||
.map_err(|e| {
|
||||
DatabaseWriteError {
|
||||
info: e.into(),
|
||||
operation: DatabaseWriteOperation::CursorUpsert,
|
||||
table_name: T::NAME,
|
||||
key: key.into_vec(),
|
||||
}
|
||||
.into()
|
||||
})
|
||||
let value_bytes: &[u8] = match value {
|
||||
Some(v) => v,
|
||||
None => &this.buf,
|
||||
};
|
||||
let inner = this.inner.as_mut().expect("cursor already taken");
|
||||
inner.put(key.as_ref(), value_bytes, WriteFlags::UPSERT).map_err(|e| {
|
||||
DatabaseWriteError {
|
||||
info: e.into(),
|
||||
operation: DatabaseWriteOperation::CursorUpsert,
|
||||
table_name: T::NAME,
|
||||
key: key.into_vec(),
|
||||
}
|
||||
.into()
|
||||
})
|
||||
},
|
||||
)
|
||||
}
|
||||
@@ -283,17 +327,20 @@ impl<T: Table> DbCursorRW<T> for Cursor<RW, T> {
|
||||
Operation::CursorInsert,
|
||||
Some(value.unwrap_or(&self.buf).len()),
|
||||
|this| {
|
||||
this.inner
|
||||
.put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::NO_OVERWRITE)
|
||||
.map_err(|e| {
|
||||
DatabaseWriteError {
|
||||
info: e.into(),
|
||||
operation: DatabaseWriteOperation::CursorInsert,
|
||||
table_name: T::NAME,
|
||||
key: key.into_vec(),
|
||||
}
|
||||
.into()
|
||||
})
|
||||
let value_bytes: &[u8] = match value {
|
||||
Some(v) => v,
|
||||
None => &this.buf,
|
||||
};
|
||||
let inner = this.inner.as_mut().expect("cursor already taken");
|
||||
inner.put(key.as_ref(), value_bytes, WriteFlags::NO_OVERWRITE).map_err(|e| {
|
||||
DatabaseWriteError {
|
||||
info: e.into(),
|
||||
operation: DatabaseWriteOperation::CursorInsert,
|
||||
table_name: T::NAME,
|
||||
key: key.into_vec(),
|
||||
}
|
||||
.into()
|
||||
})
|
||||
},
|
||||
)
|
||||
}
|
||||
@@ -307,32 +354,37 @@ impl<T: Table> DbCursorRW<T> for Cursor<RW, T> {
|
||||
Operation::CursorAppend,
|
||||
Some(value.unwrap_or(&self.buf).len()),
|
||||
|this| {
|
||||
this.inner
|
||||
.put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::APPEND)
|
||||
.map_err(|e| {
|
||||
DatabaseWriteError {
|
||||
info: e.into(),
|
||||
operation: DatabaseWriteOperation::CursorAppend,
|
||||
table_name: T::NAME,
|
||||
key: key.into_vec(),
|
||||
}
|
||||
.into()
|
||||
})
|
||||
let value_bytes: &[u8] = match value {
|
||||
Some(v) => v,
|
||||
None => &this.buf,
|
||||
};
|
||||
let inner = this.inner.as_mut().expect("cursor already taken");
|
||||
inner.put(key.as_ref(), value_bytes, WriteFlags::APPEND).map_err(|e| {
|
||||
DatabaseWriteError {
|
||||
info: e.into(),
|
||||
operation: DatabaseWriteOperation::CursorAppend,
|
||||
table_name: T::NAME,
|
||||
key: key.into_vec(),
|
||||
}
|
||||
.into()
|
||||
})
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
fn delete_current(&mut self) -> Result<(), DatabaseError> {
|
||||
self.execute_with_operation_metric(Operation::CursorDeleteCurrent, None, |this| {
|
||||
this.inner.del(WriteFlags::CURRENT).map_err(|e| DatabaseError::Delete(e.into()))
|
||||
this.inner_mut().del(WriteFlags::CURRENT).map_err(|e| DatabaseError::Delete(e.into()))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: DupSort> DbDupCursorRW<T> for Cursor<RW, T> {
|
||||
impl<T: DupSort> DbDupCursorRW<T> for Cursor<'_, RW, T> {
|
||||
fn delete_current_duplicates(&mut self) -> Result<(), DatabaseError> {
|
||||
self.execute_with_operation_metric(Operation::CursorDeleteCurrentDuplicates, None, |this| {
|
||||
this.inner.del(WriteFlags::NO_DUP_DATA).map_err(|e| DatabaseError::Delete(e.into()))
|
||||
this.inner_mut()
|
||||
.del(WriteFlags::NO_DUP_DATA)
|
||||
.map_err(|e| DatabaseError::Delete(e.into()))
|
||||
})
|
||||
}
|
||||
|
||||
@@ -343,17 +395,20 @@ impl<T: DupSort> DbDupCursorRW<T> for Cursor<RW, T> {
|
||||
Operation::CursorAppendDup,
|
||||
Some(value.unwrap_or(&self.buf).len()),
|
||||
|this| {
|
||||
this.inner
|
||||
.put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::APPEND_DUP)
|
||||
.map_err(|e| {
|
||||
DatabaseWriteError {
|
||||
info: e.into(),
|
||||
operation: DatabaseWriteOperation::CursorAppendDup,
|
||||
table_name: T::NAME,
|
||||
key: key.into_vec(),
|
||||
}
|
||||
.into()
|
||||
})
|
||||
let value_bytes: &[u8] = match value {
|
||||
Some(v) => v,
|
||||
None => &this.buf,
|
||||
};
|
||||
let inner = this.inner.as_mut().expect("cursor already taken");
|
||||
inner.put(key.as_ref(), value_bytes, WriteFlags::APPEND_DUP).map_err(|e| {
|
||||
DatabaseWriteError {
|
||||
info: e.into(),
|
||||
operation: DatabaseWriteOperation::CursorAppendDup,
|
||||
table_name: T::NAME,
|
||||
key: key.into_vec(),
|
||||
}
|
||||
.into()
|
||||
})
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
123
crates/storage/db/src/implementation/mdbx/cursor_cache.rs
Normal file
123
crates/storage/db/src/implementation/mdbx/cursor_cache.rs
Normal file
@@ -0,0 +1,123 @@
|
||||
//! Cursor cache for reusing database cursors across operations.
|
||||
//!
|
||||
//! This module provides a cache for MDBX cursors, allowing them to be reused
|
||||
//! rather than recreated for each operation. This reduces overhead in
|
||||
//! cursor-heavy workloads like state iteration.
|
||||
|
||||
use reth_libmdbx::ffi;
|
||||
use std::cell::UnsafeCell;
|
||||
|
||||
/// A wrapper around a raw MDBX cursor pointer that implements `Send`.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// MDBX cursors can be moved between threads as long as the owning transaction
|
||||
/// is also moved. Since `Tx` is `Send`, the cursor can be too. The cursor must
|
||||
/// not be used after the transaction is dropped.
|
||||
#[derive(Debug)]
|
||||
struct SendableCursor(*mut ffi::MDBX_cursor);
|
||||
|
||||
// SAFETY: MDBX cursors can be safely sent between threads when moved with their transaction.
|
||||
// The transaction owns the cursor and ensures it's valid.
|
||||
unsafe impl Send for SendableCursor {}
|
||||
|
||||
/// Cache for raw MDBX cursor pointers.
|
||||
///
|
||||
/// Stores cursors as `(table_index, cursor_ptr)` pairs. Most transactions only use
|
||||
/// a few tables, so a small Vec with linear search is more memory-efficient than
|
||||
/// a large fixed-size array.
|
||||
///
|
||||
/// Cursors are returned to the cache on drop via [`super::cursor::Cursor`] and
|
||||
/// reused on subsequent `cursor_read`/`cursor_write` calls.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// The cache stores raw pointers to MDBX cursors. These are only valid while
|
||||
/// the parent transaction is alive. The cache must be dropped before the
|
||||
/// transaction is dropped, which is guaranteed by field ordering in `Tx`.
|
||||
///
|
||||
/// This type uses `UnsafeCell` for interior mutability and manually implements `Sync`.
|
||||
/// This is safe because:
|
||||
/// - MDBX transactions are not actually shared across threads (they're `Send` but single-threaded)
|
||||
/// - The `Sync` bound on `Database::TX` is a historical API requirement, not a real usage pattern
|
||||
/// - All access to the cache happens on a single thread that owns the transaction
|
||||
pub struct CursorCache {
|
||||
/// Cached cursors as `(table_index, cursor_ptr)` pairs.
|
||||
entries: UnsafeCell<Vec<(usize, SendableCursor)>>,
|
||||
}
|
||||
|
||||
// SAFETY: While CursorCache uses UnsafeCell, it's only accessed from a single thread.
|
||||
// The Sync implementation is required because Database::TX requires Sync, but in practice
|
||||
// transactions are never shared across threads - they're moved between threads, not shared.
|
||||
unsafe impl Sync for CursorCache {}
|
||||
|
||||
impl std::fmt::Debug for CursorCache {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("CursorCache").finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for CursorCache {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl CursorCache {
|
||||
/// Creates a new empty cursor cache.
|
||||
pub const fn new() -> Self {
|
||||
Self { entries: UnsafeCell::new(Vec::new()) }
|
||||
}
|
||||
|
||||
/// Takes a cached cursor for the given table index, if one exists.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// This must only be called from a single thread (the one owning the transaction).
|
||||
#[inline]
|
||||
pub fn take(&self, index: usize) -> Option<*mut ffi::MDBX_cursor> {
|
||||
// SAFETY: We only access this from a single thread (the transaction owner)
|
||||
let entries = unsafe { &mut *self.entries.get() };
|
||||
entries.iter().position(|(idx, _)| *idx == index).map(|pos| entries.swap_remove(pos).1 .0)
|
||||
}
|
||||
|
||||
/// Stores a cursor in the cache for the given table index.
|
||||
///
|
||||
/// If a cursor already exists for this index, the old cursor is returned
|
||||
/// and must be closed by the caller.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// This must only be called from a single thread (the one owning the transaction).
|
||||
#[inline]
|
||||
pub fn put(
|
||||
&self,
|
||||
index: usize,
|
||||
cursor: *mut ffi::MDBX_cursor,
|
||||
) -> Option<*mut ffi::MDBX_cursor> {
|
||||
// SAFETY: We only access this from a single thread (the transaction owner)
|
||||
let entries = unsafe { &mut *self.entries.get() };
|
||||
// Check if there's already an entry for this index
|
||||
if let Some(pos) = entries.iter().position(|(idx, _)| *idx == index) {
|
||||
let old = std::mem::replace(&mut entries[pos].1, SendableCursor(cursor));
|
||||
Some(old.0)
|
||||
} else {
|
||||
entries.push((index, SendableCursor(cursor)));
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for CursorCache {
|
||||
fn drop(&mut self) {
|
||||
// SAFETY: We have &mut self, so we have exclusive access
|
||||
let entries = self.entries.get_mut();
|
||||
// Close all cached cursors
|
||||
for (_, SendableCursor(cursor)) in entries.drain(..) {
|
||||
// SAFETY: We own these cursors and are dropping them before the transaction
|
||||
unsafe {
|
||||
ffi::mdbx_cursor_close(cursor);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -32,6 +32,7 @@ use std::{
|
||||
use tx::Tx;
|
||||
|
||||
pub mod cursor;
|
||||
pub mod cursor_cache;
|
||||
pub mod tx;
|
||||
|
||||
mod utils;
|
||||
@@ -597,14 +598,22 @@ impl DatabaseEnv {
|
||||
}
|
||||
|
||||
let tx = self.tx_mut()?;
|
||||
let mut version_cursor = tx.cursor_write::<tables::VersionHistory>()?;
|
||||
let should_commit = {
|
||||
let mut version_cursor = tx.cursor_write::<tables::VersionHistory>()?;
|
||||
|
||||
let last_version = version_cursor.last()?.map(|(_, v)| v);
|
||||
if Some(&version) != last_version.as_ref() {
|
||||
version_cursor.upsert(
|
||||
SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(),
|
||||
&version,
|
||||
)?;
|
||||
let last_version = version_cursor.last()?.map(|(_, v)| v);
|
||||
if Some(&version) == last_version.as_ref() {
|
||||
false
|
||||
} else {
|
||||
version_cursor.upsert(
|
||||
SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs(),
|
||||
&version,
|
||||
)?;
|
||||
true
|
||||
}
|
||||
};
|
||||
|
||||
if should_commit {
|
||||
tx.commit()?;
|
||||
}
|
||||
|
||||
@@ -1089,29 +1098,32 @@ mod tests {
|
||||
|
||||
let key_to_insert = 2;
|
||||
let tx = db.tx_mut().expect(ERROR_INIT_TX);
|
||||
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
|
||||
|
||||
// INSERT
|
||||
assert!(cursor.insert(key_to_insert, &B256::ZERO).is_ok());
|
||||
assert_eq!(cursor.current().unwrap(), Some((key_to_insert, B256::ZERO)));
|
||||
// INSERT (failure)
|
||||
assert!(matches!(
|
||||
cursor.insert(key_to_insert, &B256::ZERO).unwrap_err(),
|
||||
DatabaseError::Write(err) if *err == DatabaseWriteError {
|
||||
info: Error::KeyExist.into(),
|
||||
operation: DatabaseWriteOperation::CursorInsert,
|
||||
table_name: CanonicalHeaders::NAME,
|
||||
key: key_to_insert.encode().into(),
|
||||
}));
|
||||
assert_eq!(cursor.current().unwrap(), Some((key_to_insert, B256::ZERO)));
|
||||
{
|
||||
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
|
||||
|
||||
// INSERT
|
||||
assert!(cursor.insert(key_to_insert, &B256::ZERO).is_ok());
|
||||
assert_eq!(cursor.current().unwrap(), Some((key_to_insert, B256::ZERO)));
|
||||
// INSERT (failure)
|
||||
assert!(matches!(
|
||||
cursor.insert(key_to_insert, &B256::ZERO).unwrap_err(),
|
||||
DatabaseError::Write(err) if *err == DatabaseWriteError {
|
||||
info: Error::KeyExist.into(),
|
||||
operation: DatabaseWriteOperation::CursorInsert,
|
||||
table_name: CanonicalHeaders::NAME,
|
||||
key: key_to_insert.encode().into(),
|
||||
}));
|
||||
assert_eq!(cursor.current().unwrap(), Some((key_to_insert, B256::ZERO)));
|
||||
}
|
||||
tx.commit().expect(ERROR_COMMIT);
|
||||
|
||||
// Confirm the result
|
||||
let tx = db.tx().expect(ERROR_INIT_TX);
|
||||
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
|
||||
let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
|
||||
assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
|
||||
{
|
||||
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
|
||||
let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
|
||||
assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
|
||||
}
|
||||
tx.commit().expect(ERROR_COMMIT);
|
||||
}
|
||||
|
||||
@@ -1176,23 +1188,27 @@ mod tests {
|
||||
tx.commit().expect(ERROR_COMMIT);
|
||||
|
||||
let tx = db.tx_mut().expect(ERROR_INIT_TX);
|
||||
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
|
||||
{
|
||||
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
|
||||
|
||||
// INSERT (cursor starts at last)
|
||||
cursor.last().unwrap();
|
||||
assert_eq!(cursor.current().unwrap(), Some((9, B256::ZERO)));
|
||||
// INSERT (cursor starts at last)
|
||||
cursor.last().unwrap();
|
||||
assert_eq!(cursor.current().unwrap(), Some((9, B256::ZERO)));
|
||||
|
||||
for pos in (2..=8).step_by(2) {
|
||||
assert!(cursor.insert(pos, &B256::ZERO).is_ok());
|
||||
assert_eq!(cursor.current().unwrap(), Some((pos, B256::ZERO)));
|
||||
for pos in (2..=8).step_by(2) {
|
||||
assert!(cursor.insert(pos, &B256::ZERO).is_ok());
|
||||
assert_eq!(cursor.current().unwrap(), Some((pos, B256::ZERO)));
|
||||
}
|
||||
}
|
||||
tx.commit().expect(ERROR_COMMIT);
|
||||
|
||||
// Confirm the result
|
||||
let tx = db.tx().expect(ERROR_INIT_TX);
|
||||
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
|
||||
let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
|
||||
assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||
{
|
||||
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
|
||||
let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
|
||||
assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||
}
|
||||
tx.commit().expect(ERROR_COMMIT);
|
||||
}
|
||||
|
||||
@@ -1211,15 +1227,19 @@ mod tests {
|
||||
// APPEND
|
||||
let key_to_append = 5;
|
||||
let tx = db.tx_mut().expect(ERROR_INIT_TX);
|
||||
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
|
||||
assert!(cursor.append(key_to_append, &B256::ZERO).is_ok());
|
||||
{
|
||||
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
|
||||
assert!(cursor.append(key_to_append, &B256::ZERO).is_ok());
|
||||
}
|
||||
tx.commit().expect(ERROR_COMMIT);
|
||||
|
||||
// Confirm the result
|
||||
let tx = db.tx().expect(ERROR_INIT_TX);
|
||||
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
|
||||
let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
|
||||
assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
|
||||
{
|
||||
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
|
||||
let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
|
||||
assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
|
||||
}
|
||||
tx.commit().expect(ERROR_COMMIT);
|
||||
}
|
||||
|
||||
@@ -1238,23 +1258,27 @@ mod tests {
|
||||
// APPEND
|
||||
let key_to_append = 2;
|
||||
let tx = db.tx_mut().expect(ERROR_INIT_TX);
|
||||
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
|
||||
assert!(matches!(
|
||||
cursor.append(key_to_append, &B256::ZERO).unwrap_err(),
|
||||
DatabaseError::Write(err) if *err == DatabaseWriteError {
|
||||
info: Error::KeyMismatch.into(),
|
||||
operation: DatabaseWriteOperation::CursorAppend,
|
||||
table_name: CanonicalHeaders::NAME,
|
||||
key: key_to_append.encode().into(),
|
||||
}));
|
||||
assert_eq!(cursor.current().unwrap(), Some((5, B256::ZERO))); // the end of table
|
||||
{
|
||||
let mut cursor = tx.cursor_write::<CanonicalHeaders>().unwrap();
|
||||
assert!(matches!(
|
||||
cursor.append(key_to_append, &B256::ZERO).unwrap_err(),
|
||||
DatabaseError::Write(err) if *err == DatabaseWriteError {
|
||||
info: Error::KeyMismatch.into(),
|
||||
operation: DatabaseWriteOperation::CursorAppend,
|
||||
table_name: CanonicalHeaders::NAME,
|
||||
key: key_to_append.encode().into(),
|
||||
}));
|
||||
assert_eq!(cursor.current().unwrap(), Some((5, B256::ZERO))); // the end of table
|
||||
}
|
||||
tx.commit().expect(ERROR_COMMIT);
|
||||
|
||||
// Confirm the result
|
||||
let tx = db.tx().expect(ERROR_INIT_TX);
|
||||
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
|
||||
let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
|
||||
assert_eq!(res, vec![0, 1, 3, 4, 5]);
|
||||
{
|
||||
let mut cursor = tx.cursor_read::<CanonicalHeaders>().unwrap();
|
||||
let res = cursor.walk(None).unwrap().map(|res| res.unwrap().0).collect::<Vec<_>>();
|
||||
assert_eq!(res, vec![0, 1, 3, 4, 5]);
|
||||
}
|
||||
tx.commit().expect(ERROR_COMMIT);
|
||||
}
|
||||
|
||||
@@ -1300,43 +1324,30 @@ mod tests {
|
||||
let transition_id = 2;
|
||||
|
||||
let tx = db.tx_mut().expect(ERROR_INIT_TX);
|
||||
let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
|
||||
vec![0, 1, 3, 4, 5]
|
||||
.into_iter()
|
||||
.try_for_each(|val| {
|
||||
cursor.append(
|
||||
transition_id,
|
||||
&AccountBeforeTx { address: Address::with_last_byte(val), info: None },
|
||||
)
|
||||
})
|
||||
.expect(ERROR_APPEND);
|
||||
{
|
||||
let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
|
||||
vec![0, 1, 3, 4, 5]
|
||||
.into_iter()
|
||||
.try_for_each(|val| {
|
||||
cursor.append(
|
||||
transition_id,
|
||||
&AccountBeforeTx { address: Address::with_last_byte(val), info: None },
|
||||
)
|
||||
})
|
||||
.expect(ERROR_APPEND);
|
||||
}
|
||||
tx.commit().expect(ERROR_COMMIT);
|
||||
|
||||
// APPEND DUP & APPEND
|
||||
let subkey_to_append = 2;
|
||||
let tx = db.tx_mut().expect(ERROR_INIT_TX);
|
||||
let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
|
||||
assert!(matches!(
|
||||
cursor
|
||||
.append_dup(
|
||||
transition_id,
|
||||
AccountBeforeTx {
|
||||
address: Address::with_last_byte(subkey_to_append),
|
||||
info: None
|
||||
}
|
||||
)
|
||||
.unwrap_err(),
|
||||
DatabaseError::Write(err) if *err == DatabaseWriteError {
|
||||
info: Error::KeyMismatch.into(),
|
||||
operation: DatabaseWriteOperation::CursorAppendDup,
|
||||
table_name: AccountChangeSets::NAME,
|
||||
key: transition_id.encode().into(),
|
||||
}));
|
||||
assert!(matches!(
|
||||
{
|
||||
let mut cursor = tx.cursor_write::<AccountChangeSets>().unwrap();
|
||||
assert!(matches!(
|
||||
cursor
|
||||
.append(
|
||||
transition_id - 1,
|
||||
&AccountBeforeTx {
|
||||
.append_dup(
|
||||
transition_id,
|
||||
AccountBeforeTx {
|
||||
address: Address::with_last_byte(subkey_to_append),
|
||||
info: None
|
||||
}
|
||||
@@ -1344,17 +1355,37 @@ mod tests {
|
||||
.unwrap_err(),
|
||||
DatabaseError::Write(err) if *err == DatabaseWriteError {
|
||||
info: Error::KeyMismatch.into(),
|
||||
operation: DatabaseWriteOperation::CursorAppend,
|
||||
operation: DatabaseWriteOperation::CursorAppendDup,
|
||||
table_name: AccountChangeSets::NAME,
|
||||
key: (transition_id - 1).encode().into(),
|
||||
}
|
||||
));
|
||||
assert!(cursor
|
||||
.append(
|
||||
transition_id,
|
||||
&AccountBeforeTx { address: Address::with_last_byte(subkey_to_append), info: None }
|
||||
)
|
||||
.is_ok());
|
||||
key: transition_id.encode().into(),
|
||||
}));
|
||||
assert!(matches!(
|
||||
cursor
|
||||
.append(
|
||||
transition_id - 1,
|
||||
&AccountBeforeTx {
|
||||
address: Address::with_last_byte(subkey_to_append),
|
||||
info: None
|
||||
}
|
||||
)
|
||||
.unwrap_err(),
|
||||
DatabaseError::Write(err) if *err == DatabaseWriteError {
|
||||
info: Error::KeyMismatch.into(),
|
||||
operation: DatabaseWriteOperation::CursorAppend,
|
||||
table_name: AccountChangeSets::NAME,
|
||||
key: (transition_id - 1).encode().into(),
|
||||
}
|
||||
));
|
||||
assert!(cursor
|
||||
.append(
|
||||
transition_id,
|
||||
&AccountBeforeTx {
|
||||
address: Address::with_last_byte(subkey_to_append),
|
||||
info: None
|
||||
}
|
||||
)
|
||||
.is_ok());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//! Transaction wrapper for libmdbx-sys.
|
||||
|
||||
use super::{cursor::Cursor, utils::*};
|
||||
use super::{cursor::Cursor, cursor_cache::CursorCache, utils::*};
|
||||
use crate::{
|
||||
metrics::{DatabaseEnvMetrics, Operation, TransactionMode, TransactionOutcome},
|
||||
DatabaseError,
|
||||
@@ -40,6 +40,10 @@ pub struct Tx<K: TransactionKind> {
|
||||
///
|
||||
/// If [Some], then metrics are reported.
|
||||
metrics_handler: Option<MetricsHandler<K>>,
|
||||
|
||||
/// Cache for reusing cursors across operations.
|
||||
/// Must be declared after `inner` to ensure cursors are closed before the transaction.
|
||||
cursor_cache: CursorCache,
|
||||
}
|
||||
|
||||
impl<K: TransactionKind> Tx<K> {
|
||||
@@ -59,7 +63,7 @@ impl<K: TransactionKind> Tx<K> {
|
||||
Ok(handler)
|
||||
})
|
||||
.transpose()?;
|
||||
Ok(Self { inner, dbis, metrics_handler })
|
||||
Ok(Self { inner, dbis, metrics_handler, cursor_cache: CursorCache::new() })
|
||||
}
|
||||
|
||||
/// Returns a reference to the inner libmdbx transaction.
|
||||
@@ -91,8 +95,8 @@ impl<K: TransactionKind> Tx<K> {
|
||||
self.get_dbi_raw(T::NAME)
|
||||
}
|
||||
|
||||
/// Create db Cursor
|
||||
pub fn new_cursor<T: Table>(&self) -> Result<Cursor<K, T>, DatabaseError> {
|
||||
/// Create db Cursor without caching (cursor will be closed on drop).
|
||||
pub fn new_cursor<T: Table>(&self) -> Result<Cursor<'_, K, T>, DatabaseError> {
|
||||
let inner = self
|
||||
.inner
|
||||
.cursor_with_dbi(self.get_dbi::<T>()?)
|
||||
@@ -104,6 +108,31 @@ impl<K: TransactionKind> Tx<K> {
|
||||
))
|
||||
}
|
||||
|
||||
/// Create db Cursor with caching (cursor will be returned to cache on drop).
|
||||
///
|
||||
/// If a cursor for this table is already in the cache, it will be reused.
|
||||
/// Otherwise, a new cursor is created. Either way, when the returned cursor
|
||||
/// is dropped, it will be stored in the cache for future reuse.
|
||||
pub fn new_cursor_cached<T: Table>(&self) -> Result<Cursor<'_, K, T>, DatabaseError> {
|
||||
// Try to get a cached cursor
|
||||
let inner = if let Some(raw_cursor) = self.cursor_cache.take(T::INDEX) {
|
||||
// SAFETY: The raw cursor was created for this transaction and the
|
||||
// transaction is still alive (we have &self).
|
||||
unsafe { reth_libmdbx::Cursor::from_raw(&self.inner, raw_cursor) }
|
||||
} else {
|
||||
// Create a new cursor
|
||||
self.inner
|
||||
.cursor_with_dbi(self.get_dbi::<T>()?)
|
||||
.map_err(|e| DatabaseError::InitCursor(e.into()))?
|
||||
};
|
||||
|
||||
Ok(Cursor::new_with_cache(
|
||||
inner,
|
||||
self.metrics_handler.as_ref().map(|h| h.env_metrics.clone()),
|
||||
&self.cursor_cache,
|
||||
))
|
||||
}
|
||||
|
||||
/// If `self.metrics_handler == Some(_)`, measure the time it takes to execute the closure and
|
||||
/// record a metric with the provided transaction outcome.
|
||||
///
|
||||
@@ -288,8 +317,14 @@ impl<K: TransactionKind> Drop for MetricsHandler<K> {
|
||||
impl TableImporter for Tx<RW> {}
|
||||
|
||||
impl<K: TransactionKind> DbTx for Tx<K> {
|
||||
type Cursor<T: Table> = Cursor<K, T>;
|
||||
type DupCursor<T: DupSort> = Cursor<K, T>;
|
||||
type Cursor<'tx, T: Table>
|
||||
= Cursor<'tx, K, T>
|
||||
where
|
||||
Self: 'tx;
|
||||
type DupCursor<'tx, T: DupSort>
|
||||
= Cursor<'tx, K, T>
|
||||
where
|
||||
Self: 'tx;
|
||||
|
||||
fn get<T: Table>(&self, key: T::Key) -> Result<Option<<T as Table>::Value>, DatabaseError> {
|
||||
self.get_by_encoded_key::<T>(&key.encode())
|
||||
@@ -323,13 +358,13 @@ impl<K: TransactionKind> DbTx for Tx<K> {
|
||||
}
|
||||
|
||||
// Iterate over read only values in database.
|
||||
fn cursor_read<T: Table>(&self) -> Result<Self::Cursor<T>, DatabaseError> {
|
||||
self.new_cursor()
|
||||
fn cursor_read<T: Table>(&self) -> Result<Self::Cursor<'_, T>, DatabaseError> {
|
||||
self.new_cursor_cached()
|
||||
}
|
||||
|
||||
/// Iterate over read only values in database.
|
||||
fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<T>, DatabaseError> {
|
||||
self.new_cursor()
|
||||
fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<'_, T>, DatabaseError> {
|
||||
self.new_cursor_cached()
|
||||
}
|
||||
|
||||
/// Returns number of entries in the table using cheap DB stats invocation.
|
||||
@@ -401,8 +436,14 @@ impl Tx<RW> {
|
||||
}
|
||||
|
||||
impl DbTxMut for Tx<RW> {
|
||||
type CursorMut<T: Table> = Cursor<RW, T>;
|
||||
type DupCursorMut<T: DupSort> = Cursor<RW, T>;
|
||||
type CursorMut<'tx, T: Table>
|
||||
= Cursor<'tx, RW, T>
|
||||
where
|
||||
Self: 'tx;
|
||||
type DupCursorMut<'tx, T: DupSort>
|
||||
= Cursor<'tx, RW, T>
|
||||
where
|
||||
Self: 'tx;
|
||||
|
||||
fn put<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
|
||||
self.put::<T>(PutKind::Upsert, key, value)
|
||||
@@ -436,12 +477,12 @@ impl DbTxMut for Tx<RW> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn cursor_write<T: Table>(&self) -> Result<Self::CursorMut<T>, DatabaseError> {
|
||||
self.new_cursor()
|
||||
fn cursor_write<T: Table>(&self) -> Result<Self::CursorMut<'_, T>, DatabaseError> {
|
||||
self.new_cursor_cached()
|
||||
}
|
||||
|
||||
fn cursor_dup_write<T: DupSort>(&self) -> Result<Self::DupCursorMut<T>, DatabaseError> {
|
||||
self.new_cursor()
|
||||
fn cursor_dup_write<T: DupSort>(&self) -> Result<Self::DupCursorMut<'_, T>, DatabaseError> {
|
||||
self.new_cursor_cached()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -32,7 +32,7 @@ fn bench_get_seq_iter(c: &mut Criterion) {
|
||||
count += 1;
|
||||
}
|
||||
|
||||
fn iterate<K: TransactionKind>(cursor: &mut Cursor<K>) -> Result<()> {
|
||||
fn iterate<K: TransactionKind>(cursor: &mut Cursor<'_, K>) -> Result<()> {
|
||||
let mut i = 0;
|
||||
for result in cursor.iter::<ObjectLength, ObjectLength>() {
|
||||
let (key_len, data_len) = result?;
|
||||
|
||||
@@ -14,26 +14,28 @@ use ffi::{
|
||||
use std::{borrow::Cow, ffi::c_void, fmt, marker::PhantomData, mem, ptr};
|
||||
|
||||
/// A cursor for navigating the items within a database.
|
||||
pub struct Cursor<K>
|
||||
pub struct Cursor<'tx, K>
|
||||
where
|
||||
K: TransactionKind,
|
||||
{
|
||||
txn: Transaction<K>,
|
||||
txn: &'tx Transaction<K>,
|
||||
cursor: *mut ffi::MDBX_cursor,
|
||||
/// Marker to make this type `!Send` and `!Sync`.
|
||||
_marker: PhantomData<*const ()>,
|
||||
}
|
||||
|
||||
impl<K> Cursor<K>
|
||||
impl<'tx, K> Cursor<'tx, K>
|
||||
where
|
||||
K: TransactionKind,
|
||||
{
|
||||
pub(crate) fn new(txn: Transaction<K>, dbi: ffi::MDBX_dbi) -> Result<Self> {
|
||||
pub(crate) fn new(txn: &'tx Transaction<K>, dbi: ffi::MDBX_dbi) -> Result<Self> {
|
||||
let mut cursor: *mut ffi::MDBX_cursor = ptr::null_mut();
|
||||
unsafe {
|
||||
txn.txn_execute(|txn_ptr| {
|
||||
mdbx_result(ffi::mdbx_cursor_open(txn_ptr, dbi, &mut cursor))
|
||||
})??;
|
||||
}
|
||||
Ok(Self { txn, cursor })
|
||||
Ok(Self { txn, cursor, _marker: PhantomData })
|
||||
}
|
||||
|
||||
fn new_at_position(other: &Self) -> Result<Self> {
|
||||
@@ -42,7 +44,7 @@ where
|
||||
|
||||
let res = ffi::mdbx_cursor_copy(other.cursor(), cursor);
|
||||
|
||||
let s = Self { txn: other.txn.clone(), cursor };
|
||||
let s = Self { txn: other.txn, cursor, _marker: PhantomData };
|
||||
|
||||
mdbx_result(res)?;
|
||||
|
||||
@@ -58,14 +60,43 @@ where
|
||||
self.cursor
|
||||
}
|
||||
|
||||
/// Consumes the cursor and returns the raw pointer without closing it.
|
||||
///
|
||||
/// This is useful for cursor caching - the raw pointer can be stored and
|
||||
/// later reconstituted with [`Cursor::from_raw`].
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// The caller takes ownership of the raw cursor pointer and is responsible for:
|
||||
/// - Either closing it via `mdbx_cursor_close`, or
|
||||
/// - Reconstituting it via [`Cursor::from_raw`] before the transaction ends
|
||||
pub const fn into_raw(self) -> *mut ffi::MDBX_cursor {
|
||||
let ptr = self.cursor;
|
||||
mem::forget(self);
|
||||
ptr
|
||||
}
|
||||
|
||||
/// Reconstitutes a cursor from a raw pointer.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// The caller must ensure:
|
||||
/// - The pointer was obtained from [`Cursor::into_raw`]
|
||||
/// - The pointer was created for this same transaction (or a prior transaction on the same
|
||||
/// environment that has been renewed)
|
||||
/// - The pointer has not been closed or used to create another Cursor
|
||||
pub const unsafe fn from_raw(txn: &'tx Transaction<K>, cursor: *mut ffi::MDBX_cursor) -> Self {
|
||||
Self { txn, cursor, _marker: PhantomData }
|
||||
}
|
||||
|
||||
/// Returns an iterator over the raw key value slices.
|
||||
pub fn iter_slices<'a>(self) -> IntoIter<K, Cow<'a, [u8]>, Cow<'a, [u8]>> {
|
||||
pub fn iter_slices<'a>(self) -> IntoIter<'tx, K, Cow<'a, [u8]>, Cow<'a, [u8]>> {
|
||||
self.into_iter()
|
||||
}
|
||||
|
||||
/// Returns an iterator over database items.
|
||||
#[expect(clippy::should_implement_trait)]
|
||||
pub fn into_iter<Key, Value>(self) -> IntoIter<K, Key, Value>
|
||||
pub fn into_iter<Key, Value>(self) -> IntoIter<'tx, K, Key, Value>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
@@ -338,7 +369,7 @@ where
|
||||
/// For databases with duplicate data items ([`DatabaseFlags::DUP_SORT`]), the
|
||||
/// duplicate data items of each key will be returned before moving on to
|
||||
/// the next key.
|
||||
pub fn iter<Key, Value>(&mut self) -> Iter<'_, K, Key, Value>
|
||||
pub fn iter<Key, Value>(&mut self) -> Iter<'_, 'tx, K, Key, Value>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
@@ -351,7 +382,7 @@ where
|
||||
/// For databases with duplicate data items ([`DatabaseFlags::DUP_SORT`]), the
|
||||
/// duplicate data items of each key will be returned before moving on to
|
||||
/// the next key.
|
||||
pub fn iter_start<Key, Value>(&mut self) -> Iter<'_, K, Key, Value>
|
||||
pub fn iter_start<Key, Value>(&mut self) -> Iter<'_, 'tx, K, Key, Value>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
@@ -364,7 +395,7 @@ where
|
||||
/// For databases with duplicate data items ([`DatabaseFlags::DUP_SORT`]), the
|
||||
/// duplicate data items of each key will be returned before moving on to
|
||||
/// the next key.
|
||||
pub fn iter_from<Key, Value>(&mut self, key: &[u8]) -> Iter<'_, K, Key, Value>
|
||||
pub fn iter_from<Key, Value>(&mut self, key: &[u8]) -> Iter<'_, 'tx, K, Key, Value>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
@@ -379,7 +410,7 @@ where
|
||||
/// Iterate over duplicate database items. The iterator will begin with the
|
||||
/// item next after the cursor, and continue until the end of the database.
|
||||
/// Each item will be returned as an iterator of its duplicates.
|
||||
pub fn iter_dup<Key, Value>(&mut self) -> IterDup<'_, K, Key, Value>
|
||||
pub fn iter_dup<Key, Value>(&mut self) -> IterDup<'_, 'tx, K, Key, Value>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
@@ -389,7 +420,7 @@ where
|
||||
|
||||
/// Iterate over duplicate database items starting from the beginning of the
|
||||
/// database. Each item will be returned as an iterator of its duplicates.
|
||||
pub fn iter_dup_start<Key, Value>(&mut self) -> IterDup<'_, K, Key, Value>
|
||||
pub fn iter_dup_start<Key, Value>(&mut self) -> IterDup<'_, 'tx, K, Key, Value>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
@@ -399,7 +430,7 @@ where
|
||||
|
||||
/// Iterate over duplicate items in the database starting from the given
|
||||
/// key. Each item will be returned as an iterator of its duplicates.
|
||||
pub fn iter_dup_from<Key, Value>(&mut self, key: &[u8]) -> IterDup<'_, K, Key, Value>
|
||||
pub fn iter_dup_from<Key, Value>(&mut self, key: &[u8]) -> IterDup<'_, 'tx, K, Key, Value>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
@@ -412,7 +443,7 @@ where
|
||||
}
|
||||
|
||||
/// Iterate over the duplicates of the item in the database with the given key.
|
||||
pub fn iter_dup_of<Key, Value>(&mut self, key: &[u8]) -> Iter<'_, K, Key, Value>
|
||||
pub fn iter_dup_of<Key, Value>(&mut self, key: &[u8]) -> Iter<'_, 'tx, K, Key, Value>
|
||||
where
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
@@ -430,7 +461,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl Cursor<RW> {
|
||||
impl Cursor<'_, RW> {
|
||||
/// Puts a key/data pair into the database. The cursor will be positioned at
|
||||
/// the new data item, or on failure usually near it.
|
||||
pub fn put(&mut self, key: &[u8], data: &[u8], flags: WriteFlags) -> Result<()> {
|
||||
@@ -462,7 +493,7 @@ impl Cursor<RW> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<K> Clone for Cursor<K>
|
||||
impl<K> Clone for Cursor<'_, K>
|
||||
where
|
||||
K: TransactionKind,
|
||||
{
|
||||
@@ -471,7 +502,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<K> fmt::Debug for Cursor<K>
|
||||
impl<K> fmt::Debug for Cursor<'_, K>
|
||||
where
|
||||
K: TransactionKind,
|
||||
{
|
||||
@@ -480,7 +511,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<K> Drop for Cursor<K>
|
||||
impl<K> Drop for Cursor<'_, K>
|
||||
where
|
||||
K: TransactionKind,
|
||||
{
|
||||
@@ -502,12 +533,9 @@ const unsafe fn slice_to_val(slice: Option<&[u8]>) -> ffi::MDBX_val {
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl<K> Send for Cursor<K> where K: TransactionKind {}
|
||||
unsafe impl<K> Sync for Cursor<K> where K: TransactionKind {}
|
||||
|
||||
/// An iterator over the key/value pairs in an MDBX database.
|
||||
#[derive(Debug)]
|
||||
pub enum IntoIter<K, Key, Value>
|
||||
pub enum IntoIter<'tx, K, Key, Value>
|
||||
where
|
||||
K: TransactionKind,
|
||||
Key: TableObject,
|
||||
@@ -526,7 +554,7 @@ where
|
||||
/// fails for some reason.
|
||||
Ok {
|
||||
/// The MDBX cursor with which to iterate.
|
||||
cursor: Cursor<K>,
|
||||
cursor: Cursor<'tx, K>,
|
||||
|
||||
/// The first operation to perform when the consumer calls [`Iter::next()`].
|
||||
op: ffi::MDBX_cursor_op,
|
||||
@@ -538,19 +566,19 @@ where
|
||||
},
|
||||
}
|
||||
|
||||
impl<K, Key, Value> IntoIter<K, Key, Value>
|
||||
impl<'tx, K, Key, Value> IntoIter<'tx, K, Key, Value>
|
||||
where
|
||||
K: TransactionKind,
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
{
|
||||
/// Creates a new iterator backed by the given cursor.
|
||||
fn new(cursor: Cursor<K>, op: ffi::MDBX_cursor_op, next_op: ffi::MDBX_cursor_op) -> Self {
|
||||
fn new(cursor: Cursor<'tx, K>, op: ffi::MDBX_cursor_op, next_op: ffi::MDBX_cursor_op) -> Self {
|
||||
Self::Ok { cursor, op, next_op, _marker: Default::default() }
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, Key, Value> Iterator for IntoIter<K, Key, Value>
|
||||
impl<K, Key, Value> Iterator for IntoIter<'_, K, Key, Value>
|
||||
where
|
||||
K: TransactionKind,
|
||||
Key: TableObject,
|
||||
@@ -598,7 +626,7 @@ where
|
||||
|
||||
/// An iterator over the key/value pairs in an MDBX database.
|
||||
#[derive(Debug)]
|
||||
pub enum Iter<'cur, K, Key, Value>
|
||||
pub enum Iter<'cur, 'tx, K, Key, Value>
|
||||
where
|
||||
K: TransactionKind,
|
||||
Key: TableObject,
|
||||
@@ -617,7 +645,7 @@ where
|
||||
/// fails for some reason.
|
||||
Ok {
|
||||
/// The MDBX cursor with which to iterate.
|
||||
cursor: &'cur mut Cursor<K>,
|
||||
cursor: &'cur mut Cursor<'tx, K>,
|
||||
|
||||
/// The first operation to perform when the consumer calls [`Iter::next()`].
|
||||
op: ffi::MDBX_cursor_op,
|
||||
@@ -629,7 +657,7 @@ where
|
||||
},
|
||||
}
|
||||
|
||||
impl<'cur, K, Key, Value> Iter<'cur, K, Key, Value>
|
||||
impl<'cur, 'tx, K, Key, Value> Iter<'cur, 'tx, K, Key, Value>
|
||||
where
|
||||
K: TransactionKind,
|
||||
Key: TableObject,
|
||||
@@ -637,7 +665,7 @@ where
|
||||
{
|
||||
/// Creates a new iterator backed by the given cursor.
|
||||
fn new(
|
||||
cursor: &'cur mut Cursor<K>,
|
||||
cursor: &'cur mut Cursor<'tx, K>,
|
||||
op: ffi::MDBX_cursor_op,
|
||||
next_op: ffi::MDBX_cursor_op,
|
||||
) -> Self {
|
||||
@@ -645,7 +673,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, Key, Value> Iterator for Iter<'_, K, Key, Value>
|
||||
impl<K, Key, Value> Iterator for Iter<'_, '_, K, Key, Value>
|
||||
where
|
||||
K: TransactionKind,
|
||||
Key: TableObject,
|
||||
@@ -695,7 +723,7 @@ where
|
||||
///
|
||||
/// The yielded items of the iterator are themselves iterators over the duplicate values for a
|
||||
/// specific key.
|
||||
pub enum IterDup<'cur, K, Key, Value>
|
||||
pub enum IterDup<'cur, 'tx, K, Key, Value>
|
||||
where
|
||||
K: TransactionKind,
|
||||
Key: TableObject,
|
||||
@@ -714,7 +742,7 @@ where
|
||||
/// fails for some reason.
|
||||
Ok {
|
||||
/// The MDBX cursor with which to iterate.
|
||||
cursor: &'cur mut Cursor<K>,
|
||||
cursor: &'cur mut Cursor<'tx, K>,
|
||||
|
||||
/// The first operation to perform when the consumer calls `Iter.next()`.
|
||||
op: MDBX_cursor_op,
|
||||
@@ -723,19 +751,19 @@ where
|
||||
},
|
||||
}
|
||||
|
||||
impl<'cur, K, Key, Value> IterDup<'cur, K, Key, Value>
|
||||
impl<'cur, 'tx, K, Key, Value> IterDup<'cur, 'tx, K, Key, Value>
|
||||
where
|
||||
K: TransactionKind,
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
{
|
||||
/// Creates a new iterator backed by the given cursor.
|
||||
fn new(cursor: &'cur mut Cursor<K>, op: MDBX_cursor_op) -> Self {
|
||||
fn new(cursor: &'cur mut Cursor<'tx, K>, op: MDBX_cursor_op) -> Self {
|
||||
IterDup::Ok { cursor, op, _marker: Default::default() }
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, Key, Value> fmt::Debug for IterDup<'_, K, Key, Value>
|
||||
impl<K, Key, Value> fmt::Debug for IterDup<'_, '_, K, Key, Value>
|
||||
where
|
||||
K: TransactionKind,
|
||||
Key: TableObject,
|
||||
@@ -746,13 +774,13 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, Key, Value> Iterator for IterDup<'_, K, Key, Value>
|
||||
impl<'tx, K, Key, Value> Iterator for IterDup<'_, 'tx, K, Key, Value>
|
||||
where
|
||||
K: TransactionKind,
|
||||
Key: TableObject,
|
||||
Value: TableObject,
|
||||
{
|
||||
type Item = IntoIter<K, Key, Value>;
|
||||
type Item = IntoIter<'tx, K, Key, Value>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
match self {
|
||||
@@ -767,7 +795,7 @@ where
|
||||
|
||||
(err_code == ffi::MDBX_SUCCESS).then(|| {
|
||||
IntoIter::new(
|
||||
Cursor::new_at_position(&**cursor).unwrap(),
|
||||
Cursor::new_at_position(cursor).unwrap(),
|
||||
ffi::MDBX_GET_CURRENT,
|
||||
ffi::MDBX_NEXT_DUP,
|
||||
)
|
||||
|
||||
@@ -13,7 +13,7 @@ pub extern crate reth_mdbx_sys as ffi;
|
||||
|
||||
pub use crate::{
|
||||
codec::*,
|
||||
cursor::{Cursor, Iter, IterDup},
|
||||
cursor::{Cursor, IntoIter, Iter, IterDup},
|
||||
database::Database,
|
||||
environment::{
|
||||
Environment, EnvironmentBuilder, EnvironmentKind, Geometry, HandleSlowReadersCallback,
|
||||
|
||||
@@ -253,13 +253,13 @@ where
|
||||
}
|
||||
|
||||
/// Open a new cursor on the given database.
|
||||
pub fn cursor(&self, dbi: ffi::MDBX_dbi) -> Result<Cursor<K>> {
|
||||
Cursor::new(self.clone(), dbi)
|
||||
pub fn cursor(&self, dbi: ffi::MDBX_dbi) -> Result<Cursor<'_, K>> {
|
||||
Cursor::new(self, dbi)
|
||||
}
|
||||
|
||||
/// Open a new cursor on the given dbi.
|
||||
pub fn cursor_with_dbi(&self, dbi: ffi::MDBX_dbi) -> Result<Cursor<K>> {
|
||||
Cursor::new(self.clone(), dbi)
|
||||
pub fn cursor_with_dbi(&self, dbi: ffi::MDBX_dbi) -> Result<Cursor<'_, K>> {
|
||||
Cursor::new(self, dbi)
|
||||
}
|
||||
|
||||
/// Disables a timeout for this read transaction.
|
||||
|
||||
@@ -37,27 +37,30 @@ use reth_storage_errors::provider::ProviderResult;
|
||||
use strum::{Display, EnumIs};
|
||||
|
||||
/// Type alias for [`EitherReader`] constructors.
|
||||
type EitherReaderTy<'a, P, T> =
|
||||
EitherReader<'a, CursorTy<<P as DBProvider>::Tx, T>, <P as NodePrimitivesProvider>::Primitives>;
|
||||
type EitherReaderTy<'a, P, T> = EitherReader<
|
||||
'a,
|
||||
CursorTy<'a, <P as DBProvider>::Tx, T>,
|
||||
<P as NodePrimitivesProvider>::Primitives,
|
||||
>;
|
||||
|
||||
/// Type alias for [`EitherReader`] constructors.
|
||||
type DupEitherReaderTy<'a, P, T> = EitherReader<
|
||||
'a,
|
||||
DupCursorTy<<P as DBProvider>::Tx, T>,
|
||||
DupCursorTy<'a, <P as DBProvider>::Tx, T>,
|
||||
<P as NodePrimitivesProvider>::Primitives,
|
||||
>;
|
||||
|
||||
/// Type alias for dup [`EitherWriter`] constructors.
|
||||
type DupEitherWriterTy<'a, P, T> = EitherWriter<
|
||||
'a,
|
||||
DupCursorMutTy<<P as DBProvider>::Tx, T>,
|
||||
DupCursorMutTy<'a, <P as DBProvider>::Tx, T>,
|
||||
<P as NodePrimitivesProvider>::Primitives,
|
||||
>;
|
||||
|
||||
/// Type alias for [`EitherWriter`] constructors.
|
||||
type EitherWriterTy<'a, P, T> = EitherWriter<
|
||||
'a,
|
||||
CursorMutTy<<P as DBProvider>::Tx, T>,
|
||||
CursorMutTy<'a, <P as DBProvider>::Tx, T>,
|
||||
<P as NodePrimitivesProvider>::Primitives,
|
||||
>;
|
||||
|
||||
@@ -247,7 +250,7 @@ impl<'a> EitherWriter<'a, (), ()> {
|
||||
|
||||
/// Creates a new [`EitherWriter`] for storages history based on storage settings.
|
||||
pub fn new_storages_history<P>(
|
||||
provider: &P,
|
||||
provider: &'a P,
|
||||
_rocksdb_batch: RocksBatchArg<'a>,
|
||||
) -> ProviderResult<EitherWriterTy<'a, P, tables::StoragesHistory>>
|
||||
where
|
||||
@@ -264,7 +267,7 @@ impl<'a> EitherWriter<'a, (), ()> {
|
||||
|
||||
/// Creates a new [`EitherWriter`] for transaction hash numbers based on storage settings.
|
||||
pub fn new_transaction_hash_numbers<P>(
|
||||
provider: &P,
|
||||
provider: &'a P,
|
||||
_rocksdb_batch: RocksBatchArg<'a>,
|
||||
) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionHashNumbers>>
|
||||
where
|
||||
@@ -283,7 +286,7 @@ impl<'a> EitherWriter<'a, (), ()> {
|
||||
|
||||
/// Creates a new [`EitherWriter`] for account history based on storage settings.
|
||||
pub fn new_accounts_history<P>(
|
||||
provider: &P,
|
||||
provider: &'a P,
|
||||
_rocksdb_batch: RocksBatchArg<'a>,
|
||||
) -> ProviderResult<EitherWriterTy<'a, P, tables::AccountsHistory>>
|
||||
where
|
||||
@@ -738,7 +741,7 @@ pub enum EitherReader<'a, CURSOR, N> {
|
||||
impl<'a> EitherReader<'a, (), ()> {
|
||||
/// Creates a new [`EitherReader`] for senders based on storage settings.
|
||||
pub fn new_senders<P>(
|
||||
provider: &P,
|
||||
provider: &'a P,
|
||||
) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionSenders>>
|
||||
where
|
||||
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
|
||||
@@ -756,7 +759,7 @@ impl<'a> EitherReader<'a, (), ()> {
|
||||
|
||||
/// Creates a new [`EitherReader`] for storages history based on storage settings.
|
||||
pub fn new_storages_history<P>(
|
||||
provider: &P,
|
||||
provider: &'a P,
|
||||
_rocksdb_tx: RocksTxRefArg<'a>,
|
||||
) -> ProviderResult<EitherReaderTy<'a, P, tables::StoragesHistory>>
|
||||
where
|
||||
@@ -778,7 +781,7 @@ impl<'a> EitherReader<'a, (), ()> {
|
||||
|
||||
/// Creates a new [`EitherReader`] for transaction hash numbers based on storage settings.
|
||||
pub fn new_transaction_hash_numbers<P>(
|
||||
provider: &P,
|
||||
provider: &'a P,
|
||||
_rocksdb_tx: RocksTxRefArg<'a>,
|
||||
) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionHashNumbers>>
|
||||
where
|
||||
@@ -800,7 +803,7 @@ impl<'a> EitherReader<'a, (), ()> {
|
||||
|
||||
/// Creates a new [`EitherReader`] for account history based on storage settings.
|
||||
pub fn new_accounts_history<P>(
|
||||
provider: &P,
|
||||
provider: &'a P,
|
||||
_rocksdb_tx: RocksTxRefArg<'a>,
|
||||
) -> ProviderResult<EitherReaderTy<'a, P, tables::AccountsHistory>>
|
||||
where
|
||||
@@ -822,7 +825,7 @@ impl<'a> EitherReader<'a, (), ()> {
|
||||
|
||||
/// Creates a new [`EitherReader`] for account changesets based on storage settings.
|
||||
pub fn new_account_changesets<P>(
|
||||
provider: &P,
|
||||
provider: &'a P,
|
||||
) -> ProviderResult<DupEitherReaderTy<'a, P, tables::AccountChangeSets>>
|
||||
where
|
||||
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
|
||||
|
||||
@@ -4008,57 +4008,59 @@ mod tests {
|
||||
assert_eq!(num_entries, 5);
|
||||
|
||||
// Verify account trie updates were written correctly
|
||||
let tx = provider_rw.tx_ref();
|
||||
let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
|
||||
{
|
||||
let tx = provider_rw.tx_ref();
|
||||
let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
|
||||
|
||||
// Check first account node was updated
|
||||
let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
|
||||
let entry1 = cursor.seek_exact(nibbles1).unwrap();
|
||||
assert!(entry1.is_some(), "Updated account node should exist");
|
||||
let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
|
||||
assert_eq!(
|
||||
entry1.unwrap().1.state_mask,
|
||||
expected_mask,
|
||||
"Account node should have updated state_mask"
|
||||
);
|
||||
// Check first account node was updated
|
||||
let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
|
||||
let entry1 = cursor.seek_exact(nibbles1).unwrap();
|
||||
assert!(entry1.is_some(), "Updated account node should exist");
|
||||
let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
|
||||
assert_eq!(
|
||||
entry1.unwrap().1.state_mask,
|
||||
expected_mask,
|
||||
"Account node should have updated state_mask"
|
||||
);
|
||||
|
||||
// Check deleted account node no longer exists
|
||||
let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
|
||||
let entry2 = cursor.seek_exact(nibbles2).unwrap();
|
||||
assert!(entry2.is_none(), "Deleted account node should not exist");
|
||||
// Check deleted account node no longer exists
|
||||
let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
|
||||
let entry2 = cursor.seek_exact(nibbles2).unwrap();
|
||||
assert!(entry2.is_none(), "Deleted account node should not exist");
|
||||
|
||||
// Check new account node exists
|
||||
let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
|
||||
let entry3 = cursor.seek_exact(nibbles3).unwrap();
|
||||
assert!(entry3.is_some(), "New account node should exist");
|
||||
// Check new account node exists
|
||||
let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
|
||||
let entry3 = cursor.seek_exact(nibbles3).unwrap();
|
||||
assert!(entry3.is_some(), "New account node should exist");
|
||||
|
||||
// Verify storage trie updates were written correctly
|
||||
let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
|
||||
// Verify storage trie updates were written correctly
|
||||
let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
|
||||
|
||||
// Check storage for address1
|
||||
let storage_entries1: Vec<_> = storage_cursor
|
||||
.walk_dup(Some(storage_address1), None)
|
||||
.unwrap()
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
storage_entries1.len(),
|
||||
1,
|
||||
"Storage address1 should have 1 entry after deletion"
|
||||
);
|
||||
assert_eq!(
|
||||
storage_entries1[0].1.nibbles.0,
|
||||
Nibbles::from_nibbles([0x1, 0x0]),
|
||||
"Remaining entry should be [0x1, 0x0]"
|
||||
);
|
||||
// Check storage for address1
|
||||
let storage_entries1: Vec<_> = storage_cursor
|
||||
.walk_dup(Some(storage_address1), None)
|
||||
.unwrap()
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
storage_entries1.len(),
|
||||
1,
|
||||
"Storage address1 should have 1 entry after deletion"
|
||||
);
|
||||
assert_eq!(
|
||||
storage_entries1[0].1.nibbles.0,
|
||||
Nibbles::from_nibbles([0x1, 0x0]),
|
||||
"Remaining entry should be [0x1, 0x0]"
|
||||
);
|
||||
|
||||
// Check storage for address2 was wiped
|
||||
let storage_entries2: Vec<_> = storage_cursor
|
||||
.walk_dup(Some(storage_address2), None)
|
||||
.unwrap()
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.unwrap();
|
||||
assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
|
||||
// Check storage for address2 was wiped
|
||||
let storage_entries2: Vec<_> = storage_cursor
|
||||
.walk_dup(Some(storage_address2), None)
|
||||
.unwrap()
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.unwrap();
|
||||
assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
|
||||
}
|
||||
|
||||
provider_rw.commit().unwrap();
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ use alloy_primitives::{BlockNumber, B256};
|
||||
use metrics::{Counter, Histogram};
|
||||
use parking_lot::RwLock;
|
||||
use reth_chain_state::LazyOverlay;
|
||||
use reth_db_api::DatabaseError;
|
||||
use reth_db_api::{tables, transaction::DbTx, DatabaseError};
|
||||
use reth_errors::{ProviderError, ProviderResult};
|
||||
use reth_metrics::Metrics;
|
||||
use reth_prune_types::PruneSegment;
|
||||
@@ -13,13 +13,15 @@ use reth_storage_api::{
|
||||
StorageChangeSetReader,
|
||||
};
|
||||
use reth_trie::{
|
||||
hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
|
||||
trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory},
|
||||
hashed_cursor::{HashedCursorFactory, HashedPostStateCursor, HashedPostStateCursorFactory},
|
||||
trie_cursor::{InMemoryTrieCursor, InMemoryTrieCursorFactory, TrieCursorFactory},
|
||||
updates::TrieUpdatesSorted,
|
||||
HashedPostStateSorted, KeccakKeyHasher,
|
||||
};
|
||||
use reth_trie_db::{
|
||||
ChangesetCache, DatabaseHashedCursorFactory, DatabaseHashedPostState, DatabaseTrieCursorFactory,
|
||||
ChangesetCache, DatabaseAccountTrieCursor, DatabaseHashedAccountCursor,
|
||||
DatabaseHashedCursorFactory, DatabaseHashedPostState, DatabaseHashedStorageCursor,
|
||||
DatabaseStorageTrieCursor, DatabaseTrieCursorFactory,
|
||||
};
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
@@ -527,20 +529,21 @@ where
|
||||
Self: 'a;
|
||||
|
||||
fn account_trie_cursor(&self) -> Result<Self::AccountTrieCursor<'_>, DatabaseError> {
|
||||
let db_trie_cursor_factory = DatabaseTrieCursorFactory::new(self.provider.tx_ref());
|
||||
let trie_cursor_factory =
|
||||
InMemoryTrieCursorFactory::new(db_trie_cursor_factory, self.trie_updates.as_ref());
|
||||
trie_cursor_factory.account_trie_cursor()
|
||||
let db_cursor = DatabaseAccountTrieCursor::new(
|
||||
self.provider.tx_ref().cursor_read::<tables::AccountsTrie>()?,
|
||||
);
|
||||
Ok(InMemoryTrieCursor::new_account(db_cursor, self.trie_updates.as_ref()))
|
||||
}
|
||||
|
||||
fn storage_trie_cursor(
|
||||
&self,
|
||||
hashed_address: B256,
|
||||
) -> Result<Self::StorageTrieCursor<'_>, DatabaseError> {
|
||||
let db_trie_cursor_factory = DatabaseTrieCursorFactory::new(self.provider.tx_ref());
|
||||
let trie_cursor_factory =
|
||||
InMemoryTrieCursorFactory::new(db_trie_cursor_factory, self.trie_updates.as_ref());
|
||||
trie_cursor_factory.storage_trie_cursor(hashed_address)
|
||||
let db_cursor = DatabaseStorageTrieCursor::new(
|
||||
self.provider.tx_ref().cursor_dup_read::<tables::StoragesTrie>()?,
|
||||
hashed_address,
|
||||
);
|
||||
Ok(InMemoryTrieCursor::new_storage(db_cursor, self.trie_updates.as_ref(), hashed_address))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -565,19 +568,24 @@ where
|
||||
Self: 'a;
|
||||
|
||||
fn hashed_account_cursor(&self) -> Result<Self::AccountCursor<'_>, DatabaseError> {
|
||||
let db_hashed_cursor_factory = DatabaseHashedCursorFactory::new(self.provider.tx_ref());
|
||||
let hashed_cursor_factory =
|
||||
HashedPostStateCursorFactory::new(db_hashed_cursor_factory, &self.hashed_post_state);
|
||||
hashed_cursor_factory.hashed_account_cursor()
|
||||
let db_cursor = DatabaseHashedAccountCursor::new(
|
||||
self.provider.tx_ref().cursor_read::<tables::HashedAccounts>()?,
|
||||
);
|
||||
Ok(HashedPostStateCursor::new_account(db_cursor, self.hashed_post_state.as_ref()))
|
||||
}
|
||||
|
||||
fn hashed_storage_cursor(
|
||||
&self,
|
||||
hashed_address: B256,
|
||||
) -> Result<Self::StorageCursor<'_>, DatabaseError> {
|
||||
let db_hashed_cursor_factory = DatabaseHashedCursorFactory::new(self.provider.tx_ref());
|
||||
let hashed_cursor_factory =
|
||||
HashedPostStateCursorFactory::new(db_hashed_cursor_factory, &self.hashed_post_state);
|
||||
hashed_cursor_factory.hashed_storage_cursor(hashed_address)
|
||||
let db_cursor = DatabaseHashedStorageCursor::new(
|
||||
self.provider.tx_ref().cursor_dup_read::<tables::HashedStorages>()?,
|
||||
hashed_address,
|
||||
);
|
||||
Ok(HashedPostStateCursor::new_storage(
|
||||
db_cursor,
|
||||
self.hashed_post_state.as_ref(),
|
||||
hashed_address,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,11 +21,11 @@ impl<T> DatabaseHashedCursorFactory<T> {
|
||||
|
||||
impl<TX: DbTx> HashedCursorFactory for DatabaseHashedCursorFactory<&TX> {
|
||||
type AccountCursor<'a>
|
||||
= DatabaseHashedAccountCursor<<TX as DbTx>::Cursor<tables::HashedAccounts>>
|
||||
= DatabaseHashedAccountCursor<<TX as DbTx>::Cursor<'a, tables::HashedAccounts>>
|
||||
where
|
||||
Self: 'a;
|
||||
type StorageCursor<'a>
|
||||
= DatabaseHashedStorageCursor<<TX as DbTx>::DupCursor<tables::HashedStorages>>
|
||||
= DatabaseHashedStorageCursor<<TX as DbTx>::DupCursor<'a, tables::HashedStorages>>
|
||||
where
|
||||
Self: 'a;
|
||||
|
||||
|
||||
@@ -27,12 +27,12 @@ where
|
||||
TX: DbTx,
|
||||
{
|
||||
type AccountTrieCursor<'a>
|
||||
= DatabaseAccountTrieCursor<<TX as DbTx>::Cursor<tables::AccountsTrie>>
|
||||
= DatabaseAccountTrieCursor<<TX as DbTx>::Cursor<'a, tables::AccountsTrie>>
|
||||
where
|
||||
Self: 'a;
|
||||
|
||||
type StorageTrieCursor<'a>
|
||||
= DatabaseStorageTrieCursor<<TX as DbTx>::DupCursor<tables::StoragesTrie>>
|
||||
= DatabaseStorageTrieCursor<<TX as DbTx>::DupCursor<'a, tables::StoragesTrie>>
|
||||
where
|
||||
Self: 'a;
|
||||
|
||||
@@ -64,7 +64,7 @@ impl<C> DatabaseAccountTrieCursor<C> {
|
||||
|
||||
impl<C> TrieCursor for DatabaseAccountTrieCursor<C>
|
||||
where
|
||||
C: DbCursorRO<tables::AccountsTrie> + Send,
|
||||
C: DbCursorRO<tables::AccountsTrie>,
|
||||
{
|
||||
/// Seeks an exact match for the provided key in the account trie.
|
||||
fn seek_exact(
|
||||
@@ -160,7 +160,7 @@ where
|
||||
|
||||
impl<C> TrieCursor for DatabaseStorageTrieCursor<C>
|
||||
where
|
||||
C: DbCursorRO<tables::StoragesTrie> + DbDupCursorRO<tables::StoragesTrie> + Send,
|
||||
C: DbCursorRO<tables::StoragesTrie> + DbDupCursorRO<tables::StoragesTrie>,
|
||||
{
|
||||
/// Seeks an exact match for the given key in the storage trie.
|
||||
fn seek_exact(
|
||||
@@ -202,7 +202,7 @@ where
|
||||
|
||||
impl<C> TrieStorageCursor for DatabaseStorageTrieCursor<C>
|
||||
where
|
||||
C: DbCursorRO<tables::StoragesTrie> + DbDupCursorRO<tables::StoragesTrie> + Send,
|
||||
C: DbCursorRO<tables::StoragesTrie> + DbDupCursorRO<tables::StoragesTrie>,
|
||||
{
|
||||
fn set_hashed_address(&mut self, hashed_address: B256) {
|
||||
self.hashed_address = hashed_address;
|
||||
|
||||
Reference in New Issue
Block a user