diff --git a/crates/storage/libmdbx-rs/src/cursor.rs b/crates/storage/libmdbx-rs/src/cursor.rs index 5d30d7abdd..d9dd917591 100644 --- a/crates/storage/libmdbx-rs/src/cursor.rs +++ b/crates/storage/libmdbx-rs/src/cursor.rs @@ -2,7 +2,7 @@ use crate::{ error::{mdbx_result, Error, Result}, flags::*, mdbx_try_optional, - transaction::{txn_execute, TransactionKind, RW}, + transaction::{TransactionKind, TransactionPtr, RW}, EnvironmentKind, TableObject, Transaction, }; use ffi::{ @@ -12,15 +12,14 @@ use ffi::{ MDBX_PREV_NODUP, MDBX_SET, MDBX_SET_KEY, MDBX_SET_LOWERBOUND, MDBX_SET_RANGE, }; use libc::c_void; -use parking_lot::Mutex; -use std::{borrow::Cow, fmt, marker::PhantomData, mem, ptr, rc::Rc}; +use std::{borrow::Cow, fmt, marker::PhantomData, mem, ptr}; /// A cursor for navigating the items within a database. pub struct Cursor<'txn, K> where K: TransactionKind, { - txn: Rc>, + txn: TransactionPtr, cursor: *mut ffi::MDBX_cursor, _marker: PhantomData, } @@ -34,10 +33,9 @@ where dbi: ffi::MDBX_dbi, ) -> Result { let mut cursor: *mut ffi::MDBX_cursor = ptr::null_mut(); - - let txn = txn.txn_mutex(); + let txn = txn.txn_ptr(); unsafe { - mdbx_result(txn_execute(&txn, |txn| ffi::mdbx_cursor_open(txn, dbi, &mut cursor)))?; + mdbx_result(txn.txn_execute(|txn| ffi::mdbx_cursor_open(txn, dbi, &mut cursor)))?; } Ok(Self { txn, cursor, _marker: PhantomData }) } @@ -81,7 +79,7 @@ where let mut data_val = slice_to_val(data); let key_ptr = key_val.iov_base; let data_ptr = data_val.iov_base; - txn_execute(&self.txn, |txn| { + self.txn.txn_execute(|txn| { let v = mdbx_result(ffi::mdbx_cursor_get( self.cursor, &mut key_val, @@ -431,7 +429,7 @@ impl<'txn> Cursor<'txn, RW> { let mut data_val: ffi::MDBX_val = ffi::MDBX_val { iov_len: data.len(), iov_base: data.as_ptr() as *mut c_void }; mdbx_result(unsafe { - txn_execute(&self.txn, |_| { + self.txn.txn_execute(|_| { ffi::mdbx_cursor_put(self.cursor, &key_val, &mut data_val, flags.bits()) }) })?; @@ -447,7 +445,7 @@ impl<'txn> Cursor<'txn, RW> { /// current key, if the database was opened with [DatabaseFlags::DUP_SORT]. pub fn del(&mut self, flags: WriteFlags) -> Result<()> { mdbx_result(unsafe { - txn_execute(&self.txn, |_| ffi::mdbx_cursor_del(self.cursor, flags.bits())) + self.txn.txn_execute(|_| ffi::mdbx_cursor_del(self.cursor, flags.bits())) })?; Ok(()) @@ -459,7 +457,7 @@ where K: TransactionKind, { fn clone(&self) -> Self { - txn_execute(&self.txn, |_| Self::new_at_position(self).unwrap()) + self.txn.txn_execute(|_| Self::new_at_position(self).unwrap()) } } @@ -468,7 +466,7 @@ where K: TransactionKind, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Cursor").finish() + f.debug_struct("Cursor").finish_non_exhaustive() } } @@ -477,7 +475,7 @@ where K: TransactionKind, { fn drop(&mut self) { - txn_execute(&self.txn, |_| unsafe { ffi::mdbx_cursor_close(self.cursor) }) + self.txn.txn_execute(|_| unsafe { ffi::mdbx_cursor_close(self.cursor) }) } } @@ -565,7 +563,7 @@ where let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; let op = mem::replace(op, *next_op); unsafe { - txn_execute(&cursor.txn, |txn| { + cursor.txn.txn_execute(|txn| { match ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) { ffi::MDBX_SUCCESS => { let key = match Key::decode_val::(txn, &key) { @@ -656,7 +654,7 @@ where let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; let op = mem::replace(op, *next_op); unsafe { - txn_execute(&cursor.txn, |txn| { + cursor.txn.txn_execute(|txn| { match ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) { ffi::MDBX_SUCCESS => { let key = match Key::decode_val::(txn, &key) { @@ -753,7 +751,7 @@ where let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; let op = mem::replace(op, ffi::MDBX_NEXT_NODUP); - txn_execute(&cursor.txn, |_| { + cursor.txn.txn_execute(|_| { let err_code = unsafe { ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) }; diff --git a/crates/storage/libmdbx-rs/src/database.rs b/crates/storage/libmdbx-rs/src/database.rs index f76a7d66dc..8609c4e49b 100644 --- a/crates/storage/libmdbx-rs/src/database.rs +++ b/crates/storage/libmdbx-rs/src/database.rs @@ -1,7 +1,7 @@ use crate::{ environment::EnvironmentKind, error::{mdbx_result, Result}, - transaction::{txn_execute, TransactionKind}, + transaction::TransactionKind, Transaction, }; use ffi::MDBX_db_flags_t; @@ -29,9 +29,9 @@ impl<'txn> Database<'txn> { let c_name = name.map(|n| CString::new(n).unwrap()); let name_ptr = if let Some(c_name) = &c_name { c_name.as_ptr() } else { ptr::null() }; let mut dbi: ffi::MDBX_dbi = 0; - mdbx_result(txn_execute(&txn.txn_mutex(), |txn| unsafe { - ffi::mdbx_dbi_open(txn, name_ptr, flags, &mut dbi) - }))?; + mdbx_result( + txn.txn_execute(|txn| unsafe { ffi::mdbx_dbi_open(txn, name_ptr, flags, &mut dbi) }), + )?; Ok(Self::new_from_ptr(dbi)) } diff --git a/crates/storage/libmdbx-rs/src/transaction.rs b/crates/storage/libmdbx-rs/src/transaction.rs index 229125e9fe..bd8cbae8f0 100644 --- a/crates/storage/libmdbx-rs/src/transaction.rs +++ b/crates/storage/libmdbx-rs/src/transaction.rs @@ -10,8 +10,12 @@ use indexmap::IndexSet; use libc::{c_uint, c_void}; use parking_lot::Mutex; use std::{ - fmt, fmt::Debug, marker::PhantomData, mem::size_of, ptr, rc::Rc, slice, - sync::mpsc::sync_channel, + fmt, + fmt::Debug, + marker::PhantomData, + mem::size_of, + ptr, slice, + sync::{atomic::AtomicBool, mpsc::sync_channel, Arc}, }; mod private { @@ -61,11 +65,7 @@ where K: TransactionKind, E: EnvironmentKind, { - txn: Rc>, - primed_dbis: Mutex>, - committed: bool, - env: &'env Environment, - _marker: PhantomData, + inner: Arc>, } impl<'env, K, E> Transaction<'env, K, E> @@ -88,35 +88,43 @@ where } pub(crate) fn new_from_ptr(env: &'env Environment, txn: *mut ffi::MDBX_txn) -> Self { - Self { - txn: Rc::new(Mutex::new(txn)), + let inner = TransactionInner { + txn: TransactionPtr::new(txn), primed_dbis: Mutex::new(IndexSet::new()), - committed: false, + committed: AtomicBool::new(false), env, _marker: PhantomData, - } + }; + Self { inner: Arc::new(inner) } } - /// Returns a raw pointer to the underlying MDBX transaction. + /// Executes the given closure once the lock on the transaction is acquired. /// /// The caller **must** ensure that the pointer is not used after the /// lifetime of the transaction. - pub(crate) fn txn_mutex(&self) -> Rc> { - self.txn.clone() + #[inline] + pub(crate) fn txn_execute T, T>(&self, f: F) -> T { + self.inner.txn_execute(f) } + pub(crate) fn txn_ptr(&self) -> TransactionPtr { + self.inner.txn.clone() + } + + /// Returns a copy of the raw pointer to the underlying MDBX transaction. + #[doc(hidden)] pub fn txn(&self) -> *mut ffi::MDBX_txn { - *self.txn.lock() + self.inner.txn.txn } /// Returns a raw pointer to the MDBX environment. pub fn env(&self) -> &Environment { - self.env + self.inner.env } /// Returns the transaction id. pub fn id(&self) -> u64 { - txn_execute(&self.txn, |txn| unsafe { ffi::mdbx_txn_id(txn) }) + self.txn_execute(|txn| unsafe { ffi::mdbx_txn_id(txn) }) } /// Gets an item from a database. @@ -135,7 +143,7 @@ where ffi::MDBX_val { iov_len: key.len(), iov_base: key.as_ptr() as *mut c_void }; let mut data_val: ffi::MDBX_val = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; - txn_execute(&self.txn, |txn| unsafe { + self.txn_execute(|txn| unsafe { match ffi::mdbx_get(txn, dbi, &key_val, &mut data_val) { ffi::MDBX_SUCCESS => Key::decode_val::(txn, &data_val).map(Some), ffi::MDBX_NOTFOUND => Ok(None), @@ -152,28 +160,39 @@ where } pub fn prime_for_permaopen(&self, db: Database<'_>) { - self.primed_dbis.lock().insert(db.dbi()); + self.inner.primed_dbis.lock().insert(db.dbi()); } /// Commits the transaction and returns table handles permanently open for the lifetime of /// `Environment`. - pub fn commit_and_rebind_open_dbs(mut self) -> Result<(bool, Vec>)> { - let txnlck = self.txn.lock(); - let txn = *txnlck; - let result = if K::ONLY_CLEAN { - mdbx_result(unsafe { ffi::mdbx_txn_commit_ex(txn, ptr::null_mut()) }) - } else { - let (sender, rx) = sync_channel(0); - self.env - .txn_manager() - .unwrap() - .send(TxnManagerMessage::Commit { tx: TxnPtr(txn), sender }) - .unwrap(); - rx.recv().unwrap() + pub fn commit_and_rebind_open_dbs(self) -> Result<(bool, Vec>)> { + let result = { + let result = self.txn_execute(|txn| { + if K::ONLY_CLEAN { + mdbx_result(unsafe { ffi::mdbx_txn_commit_ex(txn, ptr::null_mut()) }) + } else { + let (sender, rx) = sync_channel(0); + self.env() + .txn_manager() + .unwrap() + .send(TxnManagerMessage::Commit { tx: TxnPtr(txn), sender }) + .unwrap(); + rx.recv().unwrap() + } + }); + self.inner.set_committed(); + result }; - self.committed = true; result.map(|v| { - (v, self.primed_dbis.lock().iter().map(|&dbi| Database::new_from_ptr(dbi)).collect()) + ( + v, + self.inner + .primed_dbis + .lock() + .iter() + .map(|&dbi| Database::new_from_ptr(dbi)) + .collect(), + ) }) } @@ -188,7 +207,7 @@ where /// The returned database handle may be shared among any transaction in the environment. /// /// The database name may not contain the null character. - pub fn open_db<'txn>(&'txn self, name: Option<&str>) -> Result> { + pub fn open_db(&self, name: Option<&str>) -> Result> { Database::new(self, name, 0) } @@ -196,7 +215,7 @@ where pub fn db_flags<'txn>(&'txn self, db: &Database<'txn>) -> Result { let mut flags: c_uint = 0; unsafe { - mdbx_result(txn_execute(&self.txn, |txn| { + mdbx_result(self.txn_execute(|txn| { ffi::mdbx_dbi_flags_ex(txn, db.dbi(), &mut flags, ptr::null_mut()) }))?; } @@ -215,7 +234,7 @@ where pub fn db_stat_with_dbi(&self, dbi: ffi::MDBX_dbi) -> Result { unsafe { let mut stat = Stat::new(); - mdbx_result(txn_execute(&self.txn, |txn| { + mdbx_result(self.txn_execute(|txn| { ffi::mdbx_dbi_stat(txn, dbi, stat.mdb_stat(), size_of::()) }))?; Ok(stat) @@ -233,23 +252,73 @@ where } } -pub(crate) fn txn_execute T, T>( - txn: &Mutex<*mut ffi::MDBX_txn>, - f: F, -) -> T { - let lck = txn.lock(); - (f)(*lck) +/// Internals of a transaction. +struct TransactionInner<'env, K, E> +where + K: TransactionKind, + E: EnvironmentKind, +{ + /// The transaction pointer itself. + txn: TransactionPtr, + /// A set of database handles that are primed for permaopen. + primed_dbis: Mutex>, + /// Whether the transaction has committed. + committed: AtomicBool, + env: &'env Environment, + _marker: PhantomData, +} + +impl<'env, K, E> TransactionInner<'env, K, E> +where + K: TransactionKind, + E: EnvironmentKind, +{ + /// Marks the transaction as committed. + fn set_committed(&self) { + self.committed.store(true, std::sync::atomic::Ordering::SeqCst); + } + + fn has_committed(&self) -> bool { + self.committed.load(std::sync::atomic::Ordering::SeqCst) + } + + #[inline] + fn txn_execute T, T>(&self, f: F) -> T { + self.txn.txn_execute(f) + } +} + +impl<'env, K, E> Drop for TransactionInner<'env, K, E> +where + K: TransactionKind, + E: EnvironmentKind, +{ + fn drop(&mut self) { + self.txn_execute(|txn| { + if !self.has_committed() { + if K::ONLY_CLEAN { + unsafe { + ffi::mdbx_txn_abort(txn); + } + } else { + let (sender, rx) = sync_channel(0); + self.env + .txn_manager() + .unwrap() + .send(TxnManagerMessage::Abort { tx: TxnPtr(txn), sender }) + .unwrap(); + rx.recv().unwrap().unwrap(); + } + } + }) + } } impl<'env, E> Transaction<'env, RW, E> where E: EnvironmentKind, { - fn open_db_with_flags<'txn>( - &'txn self, - name: Option<&str>, - flags: DatabaseFlags, - ) -> Result> { + fn open_db_with_flags(&self, name: Option<&str>, flags: DatabaseFlags) -> Result> { Database::new(self, name, flags.bits()) } @@ -265,11 +334,7 @@ where /// /// This function will fail with [Error::BadRslot] if called by a thread with an open /// transaction. - pub fn create_db<'txn>( - &'txn self, - name: Option<&str>, - flags: DatabaseFlags, - ) -> Result> { + pub fn create_db(&self, name: Option<&str>, flags: DatabaseFlags) -> Result> { self.open_db_with_flags(name, flags | DatabaseFlags::CREATE) } @@ -292,7 +357,7 @@ where ffi::MDBX_val { iov_len: key.len(), iov_base: key.as_ptr() as *mut c_void }; let mut data_val: ffi::MDBX_val = ffi::MDBX_val { iov_len: data.len(), iov_base: data.as_ptr() as *mut c_void }; - mdbx_result(txn_execute(&self.txn, |txn| unsafe { + mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_put(txn, dbi, &key_val, &mut data_val, flags.bits()) }))?; @@ -315,7 +380,7 @@ where let mut data_val: ffi::MDBX_val = ffi::MDBX_val { iov_len: len, iov_base: ptr::null_mut::() }; unsafe { - mdbx_result(txn_execute(&self.txn, |txn| { + mdbx_result(self.txn_execute(|txn| { ffi::mdbx_put( txn, db.dbi(), @@ -352,7 +417,7 @@ where }); mdbx_result({ - txn_execute(&self.txn, |txn| { + self.txn_execute(|txn| { if let Some(d) = data_val { unsafe { ffi::mdbx_del(txn, dbi, &key_val, &d) } } else { @@ -369,7 +434,7 @@ where /// Empties the given database. All items will be removed. pub fn clear_db(&self, dbi: ffi::MDBX_dbi) -> Result<()> { - mdbx_result(txn_execute(&self.txn, |txn| unsafe { ffi::mdbx_drop(txn, dbi, false) }))?; + mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, dbi, false) }))?; Ok(()) } @@ -380,7 +445,7 @@ where /// Caller must close ALL other [Database] and [Cursor] instances pointing to the same dbi /// BEFORE calling this function. pub unsafe fn drop_db<'txn>(&'txn self, db: Database<'txn>) -> Result<()> { - mdbx_result(txn_execute(&self.txn, |txn| ffi::mdbx_drop(txn, db.dbi(), true)))?; + mdbx_result(self.txn_execute(|txn| ffi::mdbx_drop(txn, db.dbi(), true)))?; Ok(()) } @@ -396,7 +461,7 @@ where /// Caller must close ALL other [Database] and [Cursor] instances pointing to the same dbi /// BEFORE calling this function. pub unsafe fn close_db(&self, db: Database<'_>) -> Result<()> { - mdbx_result(ffi::mdbx_dbi_close(self.env.env(), db.dbi()))?; + mdbx_result(ffi::mdbx_dbi_close(self.env().env(), db.dbi()))?; Ok(()) } @@ -405,9 +470,9 @@ where impl<'env> Transaction<'env, RW, NoWriteMap> { /// Begins a new nested transaction inside of this transaction. pub fn begin_nested_txn(&mut self) -> Result> { - txn_execute(&self.txn, |txn| { + self.txn_execute(|txn| { let (tx, rx) = sync_channel(0); - self.env + self.env() .txn_manager() .unwrap() .send(TxnManagerMessage::Begin { @@ -417,7 +482,7 @@ impl<'env> Transaction<'env, RW, NoWriteMap> { }) .unwrap(); - rx.recv().unwrap().map(|ptr| Transaction::new_from_ptr(self.env, ptr.0)) + rx.recv().unwrap().map(|ptr| Transaction::new_from_ptr(self.env(), ptr.0)) }) } } @@ -428,46 +493,48 @@ where E: EnvironmentKind, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("RoTransaction").finish() + f.debug_struct("RoTransaction").finish_non_exhaustive() } } -impl<'env, K, E> Drop for Transaction<'env, K, E> -where - K: TransactionKind, - E: EnvironmentKind, -{ - fn drop(&mut self) { - txn_execute(&self.txn, |txn| { - if !self.committed { - if K::ONLY_CLEAN { - unsafe { - ffi::mdbx_txn_abort(txn); - } - } else { - let (sender, rx) = sync_channel(0); - self.env - .txn_manager() - .unwrap() - .send(TxnManagerMessage::Abort { tx: TxnPtr(txn), sender }) - .unwrap(); - rx.recv().unwrap().unwrap(); - } - } - }) +/// A shareable pointer to an MDBX transaction. +#[derive(Clone)] +pub(crate) struct TransactionPtr { + txn: *mut ffi::MDBX_txn, + lock: Arc>, +} + +impl TransactionPtr { + fn new(txn: *mut ffi::MDBX_txn) -> Self { + Self { txn, lock: Arc::new(Mutex::new(())) } + } + + /// Executes the given closure once the lock on the transaction is acquired. + #[inline] + pub(crate) fn txn_execute T, T>(&self, f: F) -> T { + let _lck = self.lock.lock(); + (f)(self.txn) } } -unsafe impl<'env, K, E> Send for Transaction<'env, K, E> -where - K: TransactionKind, - E: EnvironmentKind, -{ -} +// SAFETY: Access to the transaction is synchronized by the lock. +unsafe impl Send for TransactionPtr {} -unsafe impl<'env, K, E> Sync for Transaction<'env, K, E> -where - K: TransactionKind, - E: EnvironmentKind, -{ +// SAFETY: Access to the transaction is synchronized by the lock. +unsafe impl Sync for TransactionPtr {} + +#[cfg(test)] +mod tests { + use super::*; + use crate::WriteMap; + + fn assert_send_sync() {} + + #[allow(dead_code)] + fn test_txn_send_sync() { + assert_send_sync::>(); + assert_send_sync::>(); + assert_send_sync::>(); + assert_send_sync::>(); + } }