refactor: transaction internals (#5437)

This commit is contained in:
Matthias Seitz
2023-11-15 15:46:07 +01:00
committed by GitHub
parent 4e1e0463f7
commit de0cca2488
3 changed files with 182 additions and 117 deletions

View File

@@ -2,7 +2,7 @@ use crate::{
error::{mdbx_result, Error, Result},
flags::*,
mdbx_try_optional,
transaction::{txn_execute, TransactionKind, RW},
transaction::{TransactionKind, TransactionPtr, RW},
EnvironmentKind, TableObject, Transaction,
};
use ffi::{
@@ -12,15 +12,14 @@ use ffi::{
MDBX_PREV_NODUP, MDBX_SET, MDBX_SET_KEY, MDBX_SET_LOWERBOUND, MDBX_SET_RANGE,
};
use libc::c_void;
use parking_lot::Mutex;
use std::{borrow::Cow, fmt, marker::PhantomData, mem, ptr, rc::Rc};
use std::{borrow::Cow, fmt, marker::PhantomData, mem, ptr};
/// A cursor for navigating the items within a database.
pub struct Cursor<'txn, K>
where
K: TransactionKind,
{
txn: Rc<Mutex<*mut ffi::MDBX_txn>>,
txn: TransactionPtr,
cursor: *mut ffi::MDBX_cursor,
_marker: PhantomData<fn(&'txn (), K)>,
}
@@ -34,10 +33,9 @@ where
dbi: ffi::MDBX_dbi,
) -> Result<Self> {
let mut cursor: *mut ffi::MDBX_cursor = ptr::null_mut();
let txn = txn.txn_mutex();
let txn = txn.txn_ptr();
unsafe {
mdbx_result(txn_execute(&txn, |txn| ffi::mdbx_cursor_open(txn, dbi, &mut cursor)))?;
mdbx_result(txn.txn_execute(|txn| ffi::mdbx_cursor_open(txn, dbi, &mut cursor)))?;
}
Ok(Self { txn, cursor, _marker: PhantomData })
}
@@ -81,7 +79,7 @@ where
let mut data_val = slice_to_val(data);
let key_ptr = key_val.iov_base;
let data_ptr = data_val.iov_base;
txn_execute(&self.txn, |txn| {
self.txn.txn_execute(|txn| {
let v = mdbx_result(ffi::mdbx_cursor_get(
self.cursor,
&mut key_val,
@@ -431,7 +429,7 @@ impl<'txn> Cursor<'txn, RW> {
let mut data_val: ffi::MDBX_val =
ffi::MDBX_val { iov_len: data.len(), iov_base: data.as_ptr() as *mut c_void };
mdbx_result(unsafe {
txn_execute(&self.txn, |_| {
self.txn.txn_execute(|_| {
ffi::mdbx_cursor_put(self.cursor, &key_val, &mut data_val, flags.bits())
})
})?;
@@ -447,7 +445,7 @@ impl<'txn> Cursor<'txn, RW> {
/// current key, if the database was opened with [DatabaseFlags::DUP_SORT].
pub fn del(&mut self, flags: WriteFlags) -> Result<()> {
mdbx_result(unsafe {
txn_execute(&self.txn, |_| ffi::mdbx_cursor_del(self.cursor, flags.bits()))
self.txn.txn_execute(|_| ffi::mdbx_cursor_del(self.cursor, flags.bits()))
})?;
Ok(())
@@ -459,7 +457,7 @@ where
K: TransactionKind,
{
fn clone(&self) -> Self {
txn_execute(&self.txn, |_| Self::new_at_position(self).unwrap())
self.txn.txn_execute(|_| Self::new_at_position(self).unwrap())
}
}
@@ -468,7 +466,7 @@ where
K: TransactionKind,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Cursor").finish()
f.debug_struct("Cursor").finish_non_exhaustive()
}
}
@@ -477,7 +475,7 @@ where
K: TransactionKind,
{
fn drop(&mut self) {
txn_execute(&self.txn, |_| unsafe { ffi::mdbx_cursor_close(self.cursor) })
self.txn.txn_execute(|_| unsafe { ffi::mdbx_cursor_close(self.cursor) })
}
}
@@ -565,7 +563,7 @@ where
let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
let op = mem::replace(op, *next_op);
unsafe {
txn_execute(&cursor.txn, |txn| {
cursor.txn.txn_execute(|txn| {
match ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) {
ffi::MDBX_SUCCESS => {
let key = match Key::decode_val::<K>(txn, &key) {
@@ -656,7 +654,7 @@ where
let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
let op = mem::replace(op, *next_op);
unsafe {
txn_execute(&cursor.txn, |txn| {
cursor.txn.txn_execute(|txn| {
match ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) {
ffi::MDBX_SUCCESS => {
let key = match Key::decode_val::<K>(txn, &key) {
@@ -753,7 +751,7 @@ where
let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
let op = mem::replace(op, ffi::MDBX_NEXT_NODUP);
txn_execute(&cursor.txn, |_| {
cursor.txn.txn_execute(|_| {
let err_code =
unsafe { ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) };

View File

@@ -1,7 +1,7 @@
use crate::{
environment::EnvironmentKind,
error::{mdbx_result, Result},
transaction::{txn_execute, TransactionKind},
transaction::TransactionKind,
Transaction,
};
use ffi::MDBX_db_flags_t;
@@ -29,9 +29,9 @@ impl<'txn> Database<'txn> {
let c_name = name.map(|n| CString::new(n).unwrap());
let name_ptr = if let Some(c_name) = &c_name { c_name.as_ptr() } else { ptr::null() };
let mut dbi: ffi::MDBX_dbi = 0;
mdbx_result(txn_execute(&txn.txn_mutex(), |txn| unsafe {
ffi::mdbx_dbi_open(txn, name_ptr, flags, &mut dbi)
}))?;
mdbx_result(
txn.txn_execute(|txn| unsafe { ffi::mdbx_dbi_open(txn, name_ptr, flags, &mut dbi) }),
)?;
Ok(Self::new_from_ptr(dbi))
}

View File

@@ -10,8 +10,12 @@ use indexmap::IndexSet;
use libc::{c_uint, c_void};
use parking_lot::Mutex;
use std::{
fmt, fmt::Debug, marker::PhantomData, mem::size_of, ptr, rc::Rc, slice,
sync::mpsc::sync_channel,
fmt,
fmt::Debug,
marker::PhantomData,
mem::size_of,
ptr, slice,
sync::{atomic::AtomicBool, mpsc::sync_channel, Arc},
};
mod private {
@@ -61,11 +65,7 @@ where
K: TransactionKind,
E: EnvironmentKind,
{
txn: Rc<Mutex<*mut ffi::MDBX_txn>>,
primed_dbis: Mutex<IndexSet<ffi::MDBX_dbi>>,
committed: bool,
env: &'env Environment<E>,
_marker: PhantomData<fn(K)>,
inner: Arc<TransactionInner<'env, K, E>>,
}
impl<'env, K, E> Transaction<'env, K, E>
@@ -88,35 +88,43 @@ where
}
pub(crate) fn new_from_ptr(env: &'env Environment<E>, txn: *mut ffi::MDBX_txn) -> Self {
Self {
txn: Rc::new(Mutex::new(txn)),
let inner = TransactionInner {
txn: TransactionPtr::new(txn),
primed_dbis: Mutex::new(IndexSet::new()),
committed: false,
committed: AtomicBool::new(false),
env,
_marker: PhantomData,
}
};
Self { inner: Arc::new(inner) }
}
/// Returns a raw pointer to the underlying MDBX transaction.
/// Executes the given closure once the lock on the transaction is acquired.
///
/// The caller **must** ensure that the pointer is not used after the
/// lifetime of the transaction.
pub(crate) fn txn_mutex(&self) -> Rc<Mutex<*mut ffi::MDBX_txn>> {
self.txn.clone()
#[inline]
pub(crate) fn txn_execute<F: FnOnce(*mut ffi::MDBX_txn) -> T, T>(&self, f: F) -> T {
self.inner.txn_execute(f)
}
pub(crate) fn txn_ptr(&self) -> TransactionPtr {
self.inner.txn.clone()
}
/// Returns a copy of the raw pointer to the underlying MDBX transaction.
#[doc(hidden)]
pub fn txn(&self) -> *mut ffi::MDBX_txn {
*self.txn.lock()
self.inner.txn.txn
}
/// Returns a raw pointer to the MDBX environment.
pub fn env(&self) -> &Environment<E> {
self.env
self.inner.env
}
/// Returns the transaction id.
pub fn id(&self) -> u64 {
txn_execute(&self.txn, |txn| unsafe { ffi::mdbx_txn_id(txn) })
self.txn_execute(|txn| unsafe { ffi::mdbx_txn_id(txn) })
}
/// Gets an item from a database.
@@ -135,7 +143,7 @@ where
ffi::MDBX_val { iov_len: key.len(), iov_base: key.as_ptr() as *mut c_void };
let mut data_val: ffi::MDBX_val = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
txn_execute(&self.txn, |txn| unsafe {
self.txn_execute(|txn| unsafe {
match ffi::mdbx_get(txn, dbi, &key_val, &mut data_val) {
ffi::MDBX_SUCCESS => Key::decode_val::<K>(txn, &data_val).map(Some),
ffi::MDBX_NOTFOUND => Ok(None),
@@ -152,28 +160,39 @@ where
}
pub fn prime_for_permaopen(&self, db: Database<'_>) {
self.primed_dbis.lock().insert(db.dbi());
self.inner.primed_dbis.lock().insert(db.dbi());
}
/// Commits the transaction and returns table handles permanently open for the lifetime of
/// `Environment`.
pub fn commit_and_rebind_open_dbs(mut self) -> Result<(bool, Vec<Database<'env>>)> {
let txnlck = self.txn.lock();
let txn = *txnlck;
let result = if K::ONLY_CLEAN {
mdbx_result(unsafe { ffi::mdbx_txn_commit_ex(txn, ptr::null_mut()) })
} else {
let (sender, rx) = sync_channel(0);
self.env
.txn_manager()
.unwrap()
.send(TxnManagerMessage::Commit { tx: TxnPtr(txn), sender })
.unwrap();
rx.recv().unwrap()
pub fn commit_and_rebind_open_dbs(self) -> Result<(bool, Vec<Database<'env>>)> {
let result = {
let result = self.txn_execute(|txn| {
if K::ONLY_CLEAN {
mdbx_result(unsafe { ffi::mdbx_txn_commit_ex(txn, ptr::null_mut()) })
} else {
let (sender, rx) = sync_channel(0);
self.env()
.txn_manager()
.unwrap()
.send(TxnManagerMessage::Commit { tx: TxnPtr(txn), sender })
.unwrap();
rx.recv().unwrap()
}
});
self.inner.set_committed();
result
};
self.committed = true;
result.map(|v| {
(v, self.primed_dbis.lock().iter().map(|&dbi| Database::new_from_ptr(dbi)).collect())
(
v,
self.inner
.primed_dbis
.lock()
.iter()
.map(|&dbi| Database::new_from_ptr(dbi))
.collect(),
)
})
}
@@ -188,7 +207,7 @@ where
/// The returned database handle may be shared among any transaction in the environment.
///
/// The database name may not contain the null character.
pub fn open_db<'txn>(&'txn self, name: Option<&str>) -> Result<Database<'txn>> {
pub fn open_db(&self, name: Option<&str>) -> Result<Database<'_>> {
Database::new(self, name, 0)
}
@@ -196,7 +215,7 @@ where
pub fn db_flags<'txn>(&'txn self, db: &Database<'txn>) -> Result<DatabaseFlags> {
let mut flags: c_uint = 0;
unsafe {
mdbx_result(txn_execute(&self.txn, |txn| {
mdbx_result(self.txn_execute(|txn| {
ffi::mdbx_dbi_flags_ex(txn, db.dbi(), &mut flags, ptr::null_mut())
}))?;
}
@@ -215,7 +234,7 @@ where
pub fn db_stat_with_dbi(&self, dbi: ffi::MDBX_dbi) -> Result<Stat> {
unsafe {
let mut stat = Stat::new();
mdbx_result(txn_execute(&self.txn, |txn| {
mdbx_result(self.txn_execute(|txn| {
ffi::mdbx_dbi_stat(txn, dbi, stat.mdb_stat(), size_of::<Stat>())
}))?;
Ok(stat)
@@ -233,23 +252,73 @@ where
}
}
pub(crate) fn txn_execute<F: FnOnce(*mut ffi::MDBX_txn) -> T, T>(
txn: &Mutex<*mut ffi::MDBX_txn>,
f: F,
) -> T {
let lck = txn.lock();
(f)(*lck)
/// Internals of a transaction.
struct TransactionInner<'env, K, E>
where
K: TransactionKind,
E: EnvironmentKind,
{
/// The transaction pointer itself.
txn: TransactionPtr,
/// A set of database handles that are primed for permaopen.
primed_dbis: Mutex<IndexSet<ffi::MDBX_dbi>>,
/// Whether the transaction has committed.
committed: AtomicBool,
env: &'env Environment<E>,
_marker: PhantomData<fn(K)>,
}
impl<'env, K, E> TransactionInner<'env, K, E>
where
K: TransactionKind,
E: EnvironmentKind,
{
/// Marks the transaction as committed.
fn set_committed(&self) {
self.committed.store(true, std::sync::atomic::Ordering::SeqCst);
}
fn has_committed(&self) -> bool {
self.committed.load(std::sync::atomic::Ordering::SeqCst)
}
#[inline]
fn txn_execute<F: FnOnce(*mut ffi::MDBX_txn) -> T, T>(&self, f: F) -> T {
self.txn.txn_execute(f)
}
}
impl<'env, K, E> Drop for TransactionInner<'env, K, E>
where
K: TransactionKind,
E: EnvironmentKind,
{
fn drop(&mut self) {
self.txn_execute(|txn| {
if !self.has_committed() {
if K::ONLY_CLEAN {
unsafe {
ffi::mdbx_txn_abort(txn);
}
} else {
let (sender, rx) = sync_channel(0);
self.env
.txn_manager()
.unwrap()
.send(TxnManagerMessage::Abort { tx: TxnPtr(txn), sender })
.unwrap();
rx.recv().unwrap().unwrap();
}
}
})
}
}
impl<'env, E> Transaction<'env, RW, E>
where
E: EnvironmentKind,
{
fn open_db_with_flags<'txn>(
&'txn self,
name: Option<&str>,
flags: DatabaseFlags,
) -> Result<Database<'txn>> {
fn open_db_with_flags(&self, name: Option<&str>, flags: DatabaseFlags) -> Result<Database<'_>> {
Database::new(self, name, flags.bits())
}
@@ -265,11 +334,7 @@ where
///
/// This function will fail with [Error::BadRslot] if called by a thread with an open
/// transaction.
pub fn create_db<'txn>(
&'txn self,
name: Option<&str>,
flags: DatabaseFlags,
) -> Result<Database<'txn>> {
pub fn create_db(&self, name: Option<&str>, flags: DatabaseFlags) -> Result<Database<'_>> {
self.open_db_with_flags(name, flags | DatabaseFlags::CREATE)
}
@@ -292,7 +357,7 @@ where
ffi::MDBX_val { iov_len: key.len(), iov_base: key.as_ptr() as *mut c_void };
let mut data_val: ffi::MDBX_val =
ffi::MDBX_val { iov_len: data.len(), iov_base: data.as_ptr() as *mut c_void };
mdbx_result(txn_execute(&self.txn, |txn| unsafe {
mdbx_result(self.txn_execute(|txn| unsafe {
ffi::mdbx_put(txn, dbi, &key_val, &mut data_val, flags.bits())
}))?;
@@ -315,7 +380,7 @@ where
let mut data_val: ffi::MDBX_val =
ffi::MDBX_val { iov_len: len, iov_base: ptr::null_mut::<c_void>() };
unsafe {
mdbx_result(txn_execute(&self.txn, |txn| {
mdbx_result(self.txn_execute(|txn| {
ffi::mdbx_put(
txn,
db.dbi(),
@@ -352,7 +417,7 @@ where
});
mdbx_result({
txn_execute(&self.txn, |txn| {
self.txn_execute(|txn| {
if let Some(d) = data_val {
unsafe { ffi::mdbx_del(txn, dbi, &key_val, &d) }
} else {
@@ -369,7 +434,7 @@ where
/// Empties the given database. All items will be removed.
pub fn clear_db(&self, dbi: ffi::MDBX_dbi) -> Result<()> {
mdbx_result(txn_execute(&self.txn, |txn| unsafe { ffi::mdbx_drop(txn, dbi, false) }))?;
mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, dbi, false) }))?;
Ok(())
}
@@ -380,7 +445,7 @@ where
/// Caller must close ALL other [Database] and [Cursor] instances pointing to the same dbi
/// BEFORE calling this function.
pub unsafe fn drop_db<'txn>(&'txn self, db: Database<'txn>) -> Result<()> {
mdbx_result(txn_execute(&self.txn, |txn| ffi::mdbx_drop(txn, db.dbi(), true)))?;
mdbx_result(self.txn_execute(|txn| ffi::mdbx_drop(txn, db.dbi(), true)))?;
Ok(())
}
@@ -396,7 +461,7 @@ where
/// Caller must close ALL other [Database] and [Cursor] instances pointing to the same dbi
/// BEFORE calling this function.
pub unsafe fn close_db(&self, db: Database<'_>) -> Result<()> {
mdbx_result(ffi::mdbx_dbi_close(self.env.env(), db.dbi()))?;
mdbx_result(ffi::mdbx_dbi_close(self.env().env(), db.dbi()))?;
Ok(())
}
@@ -405,9 +470,9 @@ where
impl<'env> Transaction<'env, RW, NoWriteMap> {
/// Begins a new nested transaction inside of this transaction.
pub fn begin_nested_txn(&mut self) -> Result<Transaction<'_, RW, NoWriteMap>> {
txn_execute(&self.txn, |txn| {
self.txn_execute(|txn| {
let (tx, rx) = sync_channel(0);
self.env
self.env()
.txn_manager()
.unwrap()
.send(TxnManagerMessage::Begin {
@@ -417,7 +482,7 @@ impl<'env> Transaction<'env, RW, NoWriteMap> {
})
.unwrap();
rx.recv().unwrap().map(|ptr| Transaction::new_from_ptr(self.env, ptr.0))
rx.recv().unwrap().map(|ptr| Transaction::new_from_ptr(self.env(), ptr.0))
})
}
}
@@ -428,46 +493,48 @@ where
E: EnvironmentKind,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RoTransaction").finish()
f.debug_struct("RoTransaction").finish_non_exhaustive()
}
}
impl<'env, K, E> Drop for Transaction<'env, K, E>
where
K: TransactionKind,
E: EnvironmentKind,
{
fn drop(&mut self) {
txn_execute(&self.txn, |txn| {
if !self.committed {
if K::ONLY_CLEAN {
unsafe {
ffi::mdbx_txn_abort(txn);
}
} else {
let (sender, rx) = sync_channel(0);
self.env
.txn_manager()
.unwrap()
.send(TxnManagerMessage::Abort { tx: TxnPtr(txn), sender })
.unwrap();
rx.recv().unwrap().unwrap();
}
}
})
/// A shareable pointer to an MDBX transaction.
#[derive(Clone)]
pub(crate) struct TransactionPtr {
txn: *mut ffi::MDBX_txn,
lock: Arc<Mutex<()>>,
}
impl TransactionPtr {
fn new(txn: *mut ffi::MDBX_txn) -> Self {
Self { txn, lock: Arc::new(Mutex::new(())) }
}
/// Executes the given closure once the lock on the transaction is acquired.
#[inline]
pub(crate) fn txn_execute<F: FnOnce(*mut ffi::MDBX_txn) -> T, T>(&self, f: F) -> T {
let _lck = self.lock.lock();
(f)(self.txn)
}
}
unsafe impl<'env, K, E> Send for Transaction<'env, K, E>
where
K: TransactionKind,
E: EnvironmentKind,
{
}
// SAFETY: Access to the transaction is synchronized by the lock.
unsafe impl Send for TransactionPtr {}
unsafe impl<'env, K, E> Sync for Transaction<'env, K, E>
where
K: TransactionKind,
E: EnvironmentKind,
{
// SAFETY: Access to the transaction is synchronized by the lock.
unsafe impl Sync for TransactionPtr {}
#[cfg(test)]
mod tests {
use super::*;
use crate::WriteMap;
fn assert_send_sync<T: Send + Sync>() {}
#[allow(dead_code)]
fn test_txn_send_sync() {
assert_send_sync::<Transaction<'_, RO, NoWriteMap>>();
assert_send_sync::<Transaction<'_, RW, NoWriteMap>>();
assert_send_sync::<Transaction<'_, RO, WriteMap>>();
assert_send_sync::<Transaction<'_, RW, WriteMap>>();
}
}