From 378c5851d58d2f362009e1c10fb8d409c9566dd4 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Thu, 18 Dec 2025 19:22:54 -0500 Subject: [PATCH] wip --- .../libmdbx-rs/mdbx-sys/libmdbx/mdbx.c | 119 ++++++++++++++++++ .../libmdbx-rs/mdbx-sys/libmdbx/mdbx.h | 29 +++++ crates/storage/libmdbx-rs/src/transaction.rs | 14 +++ .../storage/libmdbx-rs/tests/transaction.rs | 32 +++++ 4 files changed, 194 insertions(+) diff --git a/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c b/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c index ae5de1be4c..79c9095ca9 100644 --- a/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c +++ b/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c @@ -12936,6 +12936,125 @@ int mdbx_txn_renew(MDBX_txn *txn) { return LOG_IFERR(rc); } +int mdbx_txn_clone(const MDBX_txn *src, MDBX_txn **dest) { + if (unlikely(!dest)) + return LOG_IFERR(MDBX_EINVAL); + *dest = nullptr; + + int rc = check_txn(src, MDBX_TXN_BLOCKED); + if (unlikely(rc != MDBX_SUCCESS)) + return LOG_IFERR(rc); + + if (unlikely((src->flags & MDBX_TXN_RDONLY) == 0)) + return LOG_IFERR(MDBX_EINVAL); + + MDBX_env *const env = src->env; + rc = check_env(env, true); + if (unlikely(rc != MDBX_SUCCESS)) + return LOG_IFERR(rc); + + if (unlikely((env->flags & MDBX_NOSTICKYTHREADS) == 0)) + return LOG_IFERR(MDBX_TXN_OVERLAPPING); + + MDBX_txn *txn = nullptr; + const intptr_t bitmap_bytes = +#if MDBX_ENABLE_DBI_SPARSE + ceil_powerof2(env->max_dbi, CHAR_BIT * sizeof(txn->dbi_sparse[0])) / CHAR_BIT; +#else + 0; +#endif /* MDBX_ENABLE_DBI_SPARSE */ + + STATIC_ASSERT(sizeof(txn->tw) > sizeof(txn->to)); + const size_t base = sizeof(MDBX_txn) - sizeof(txn->tw) + sizeof(txn->to); + const size_t size = base + (size_t)bitmap_bytes + env->max_dbi * sizeof(txn->dbi_seqs[0]) + + env->max_dbi * (sizeof(txn->dbs[0]) + sizeof(txn->cursors[0]) + sizeof(txn->dbi_state[0])); + + txn = osal_malloc(size); + if (unlikely(txn == nullptr)) + return LOG_IFERR(MDBX_ENOMEM); +#if MDBX_DEBUG + memset(txn, 0xCD, size); + VALGRIND_MAKE_MEM_UNDEFINED(txn, size); +#endif /* MDBX_DEBUG */ + MDBX_ANALYSIS_ASSUME(size > base); + memset(txn, 0, (MDBX_GOOFY_MSVC_STATIC_ANALYZER && base > size) ? size : base); + txn->dbs = ptr_disp(txn, base); + txn->cursors = ptr_disp(txn->dbs, env->max_dbi * sizeof(txn->dbs[0])); +#if MDBX_DEBUG + txn->cursors[FREE_DBI] = nullptr; /* avoid SIGSEGV in an assertion later */ +#endif + txn->dbi_state = ptr_disp(txn, size - env->max_dbi * sizeof(txn->dbi_state[0])); + txn->dbi_seqs = ptr_disp(txn->cursors, env->max_dbi * sizeof(txn->cursors[0])); +#if MDBX_ENABLE_DBI_SPARSE + txn->dbi_sparse = ptr_disp(txn->dbi_state, -bitmap_bytes); +#endif /* MDBX_ENABLE_DBI_SPARSE */ + + txn->env = env; + txn->flags = src->flags & ~txn_state_flags; + txn->parent = nullptr; + txn->nested = nullptr; + + txn->txnid = src->txnid; + txn->front_txnid = src->front_txnid; + txn->geo = src->geo; + txn->canary = src->canary; + txn->owner = (env->flags & MDBX_NOSTICKYTHREADS) ? 0 : osal_thread_self(); + + if (unlikely(src->n_dbi > env->max_dbi)) { + rc = MDBX_CORRUPTED; + goto bailout; + } + + txn->n_dbi = src->n_dbi; + memset(txn->cursors, 0, env->max_dbi * sizeof(txn->cursors[0])); + memset(txn->dbi_state, 0, env->max_dbi * sizeof(txn->dbi_state[0])); + memset(txn->dbi_seqs, 0, env->max_dbi * sizeof(txn->dbi_seqs[0])); +#if MDBX_ENABLE_DBI_SPARSE + if (bitmap_bytes) + memset(txn->dbi_sparse, 0, bitmap_bytes); +#endif /* MDBX_ENABLE_DBI_SPARSE */ + + memcpy(txn->dbs, src->dbs, txn->n_dbi * sizeof(txn->dbs[0])); + memcpy(txn->dbi_state, src->dbi_state, txn->n_dbi * sizeof(txn->dbi_state[0])); + memcpy(txn->dbi_seqs, src->dbi_seqs, txn->n_dbi * sizeof(txn->dbi_seqs[0])); +#if MDBX_ENABLE_DBI_SPARSE + if (bitmap_bytes) + memcpy(txn->dbi_sparse, src->dbi_sparse, bitmap_bytes); +#endif /* MDBX_ENABLE_DBI_SPARSE */ + + txn->to.reader = nullptr; + if (env->lck_mmap.lck) { + bsr_t brs = mvcc_bind_slot(env); + if (unlikely(brs.err != MDBX_SUCCESS)) { + rc = brs.err; + goto bailout; + } + txn->to.reader = brs.rslot; + safe64_reset(&txn->to.reader->txnid, true); + if (src->to.reader) { + atomic_store32(&txn->to.reader->snapshot_pages_used, + atomic_load32(&src->to.reader->snapshot_pages_used, mo_Relaxed), mo_Relaxed); + atomic_store64(&txn->to.reader->snapshot_pages_retired, + atomic_load64(&src->to.reader->snapshot_pages_retired, mo_Relaxed), mo_Relaxed); + } else { + atomic_store32(&txn->to.reader->snapshot_pages_used, src->geo.first_unallocated, mo_Relaxed); + atomic_store64(&txn->to.reader->snapshot_pages_retired, 0, mo_Relaxed); + } + safe64_write(&txn->to.reader->txnid, src->txnid); + atomic_store32(&env->lck->rdt_refresh_flag, true, mo_AcquireRelease); + } + + txn->signature = txn_signature; + txn->userctx = nullptr; + *dest = txn; + DEBUG("clone txn %" PRIaTXN "r %p from %p on env %p", txn->txnid, (void *)txn, (void *)src, (void *)env); + return MDBX_SUCCESS; + +bailout: + osal_free(txn); + return LOG_IFERR(rc); +} + int mdbx_txn_set_userctx(MDBX_txn *txn, void *ctx) { int rc = check_txn(txn, MDBX_TXN_FINISHED); if (unlikely(rc != MDBX_SUCCESS)) diff --git a/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.h b/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.h index 90835d1b9e..2b46dfee95 100644 --- a/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.h +++ b/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.h @@ -3882,6 +3882,35 @@ MDBX_NOTHROW_PURE_FUNCTION LIBMDBX_API void *mdbx_env_get_userctx(const MDBX_env LIBMDBX_API int mdbx_txn_begin_ex(MDBX_env *env, MDBX_txn *parent, MDBX_txn_flags_t flags, MDBX_txn **txn, void *context); +/** \brief Clone a read-only transaction snapshot. + * \ingroup c_transactions + * + * Creates a new read-only transaction that uses the same MVCC snapshot as + * the \p src transaction. This allows parallel read operations across threads + * without re-opening a read transaction and re-fetching state. + * + * \note This function requires \ref MDBX_NOSTICKYTHREADS (aka MDBX_NOTLS) + * to be enabled for the environment. Otherwise it will return + * \ref MDBX_TXN_OVERLAPPING. + * + * \note The \p src transaction must be an active read-only transaction. + * + * \note The \p src transaction and the cloned transaction must not be used + * concurrently from multiple threads. Each transaction and its cursors must + * be confined to a single thread at a time. + * + * \param [in] src A read-only transaction handle returned by + * \ref mdbx_txn_begin_ex() or \ref mdbx_txn_begin(). + * \param [out] dest Address where the cloned \ref MDBX_txn handle will be + * stored. Must not be NULL. + * + * \returns A non-zero error value on failure and 0 on success. + * \retval MDBX_EINVAL Invalid arguments or \p src is not read-only. + * \retval MDBX_TXN_OVERLAPPING \ref MDBX_NOSTICKYTHREADS is not enabled. + * \retval MDBX_READERS_FULL Reader lock table is full. + * \retval MDBX_ENOMEM Out of memory. */ +LIBMDBX_API int mdbx_txn_clone(const MDBX_txn *src, MDBX_txn **dest); + /** \brief Create a transaction for use with the environment. * \ingroup c_transactions * diff --git a/crates/storage/libmdbx-rs/src/transaction.rs b/crates/storage/libmdbx-rs/src/transaction.rs index e47e71ac26..f80b97857d 100644 --- a/crates/storage/libmdbx-rs/src/transaction.rs +++ b/crates/storage/libmdbx-rs/src/transaction.rs @@ -483,6 +483,20 @@ impl Transaction { } impl Transaction { + /// Clones this read-only transaction, preserving the same MVCC snapshot. + /// + /// This requires the environment to be opened with `MDBX_NOSTICKYTHREADS` (aka `MDBX_NOTLS`). + /// The cloned transaction must not be used concurrently with this transaction from multiple + /// threads. + pub fn clone_snapshot(&self) -> Result { + let cloned = self.txn_execute(|txn| { + let mut cloned: *mut ffi::MDBX_txn = ptr::null_mut(); + mdbx_result(unsafe { ffi::mdbx_txn_clone(txn, &mut cloned) }).map(|_| cloned) + })??; + + Ok(Self::new_from_ptr(self.env().clone(), cloned)) + } + /// Closes the database handle. /// /// # Safety diff --git a/crates/storage/libmdbx-rs/tests/transaction.rs b/crates/storage/libmdbx-rs/tests/transaction.rs index c7e8e3fcd3..ccabd9e433 100644 --- a/crates/storage/libmdbx-rs/tests/transaction.rs +++ b/crates/storage/libmdbx-rs/tests/transaction.rs @@ -373,3 +373,35 @@ fn test_stat_dupsort() { assert_eq!(stat.entries(), 8); } } + +#[test] +fn test_txn_clone_snapshot() { + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + { + let txn = env.begin_rw_txn().unwrap(); + let db = txn.open_db(None).unwrap(); + txn.put(db.dbi(), b"k", b"v1", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + } + + let ro = env.begin_ro_txn().unwrap(); + let clone = ro.clone_snapshot().unwrap(); + + { + let txn = env.begin_rw_txn().unwrap(); + let db = txn.open_db(None).unwrap(); + txn.put(db.dbi(), b"k", b"v2", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + } + + let db = ro.open_db(None).unwrap(); + assert_eq!(ro.get::<[u8; 2]>(db.dbi(), b"k").unwrap(), Some(*b"v1")); + + let db = clone.open_db(None).unwrap(); + assert_eq!(clone.get::<[u8; 2]>(db.dbi(), b"k").unwrap(), Some(*b"v1")); + + let ro2 = env.begin_ro_txn().unwrap(); + let db = ro2.open_db(None).unwrap(); + assert_eq!(ro2.get::<[u8; 2]>(db.dbi(), b"k").unwrap(), Some(*b"v2")); +}