diff --git a/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c b/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c index ae5de1be4c..0b331deba6 100644 --- a/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c +++ b/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c @@ -12936,6 +12936,124 @@ int mdbx_txn_renew(MDBX_txn *txn) { return LOG_IFERR(rc); } +int mdbx_txn_clone(const MDBX_txn *src, MDBX_txn **out) { + if (unlikely(!out)) + return LOG_IFERR(MDBX_EINVAL); + *out = nullptr; + + if (unlikely(!src)) + return LOG_IFERR(MDBX_EINVAL); + + if (unlikely(src->signature != txn_signature)) + return LOG_IFERR(MDBX_EBADSIGN); + + if (unlikely((src->flags & MDBX_TXN_RDONLY) == 0)) + return LOG_IFERR(MDBX_BAD_TXN); + + if (unlikely(src->flags & (MDBX_TXN_FINISHED | MDBX_TXN_ERROR | MDBX_TXN_PARKED))) + return LOG_IFERR(MDBX_BAD_TXN); + + MDBX_env *const env = src->env; + int rc = check_env(env, true); + if (unlikely(rc != MDBX_SUCCESS)) + return LOG_IFERR(rc); + + if (unlikely(!env->lck_mmap.lck)) + return LOG_IFERR(MDBX_EPERM); + + const txnid_t snap_oldest = atomic_load64(&env->lck->cached_oldest, mo_AcquireRelease); + if (unlikely(src->txnid < snap_oldest)) + return LOG_IFERR(MDBX_MVCC_RETARDED); + + const intptr_t bitmap_bytes = +#if MDBX_ENABLE_DBI_SPARSE + ceil_powerof2(env->max_dbi, CHAR_BIT * sizeof(src->dbi_sparse[0])) / CHAR_BIT; +#else + 0; +#endif /* MDBX_ENABLE_DBI_SPARSE */ + + const size_t base = sizeof(MDBX_txn) - sizeof(src->tw) + sizeof(src->to); + const size_t size = base + (size_t)bitmap_bytes + env->max_dbi * sizeof(src->dbi_seqs[0]) + + env->max_dbi * (sizeof(src->dbs[0]) + sizeof(src->cursors[0]) + sizeof(src->dbi_state[0])); + + MDBX_txn *clone = osal_malloc(size); + if (unlikely(clone == nullptr)) + return LOG_IFERR(MDBX_ENOMEM); + +#if MDBX_DEBUG + memset(clone, 0xCD, size); + VALGRIND_MAKE_MEM_UNDEFINED(clone, size); +#endif /* MDBX_DEBUG */ + memset(clone, 0, base); + + clone->dbs = ptr_disp(clone, base); + clone->cursors = ptr_disp(clone->dbs, env->max_dbi * sizeof(clone->dbs[0])); + clone->dbi_state = ptr_disp(clone, size - env->max_dbi * sizeof(clone->dbi_state[0])); + clone->dbi_seqs = ptr_disp(clone->cursors, env->max_dbi * sizeof(clone->cursors[0])); +#if MDBX_ENABLE_DBI_SPARSE + clone->dbi_sparse = ptr_disp(clone->dbi_state, -bitmap_bytes); +#endif /* MDBX_ENABLE_DBI_SPARSE */ + + clone->flags = src->flags; + clone->env = env; + clone->parent = nullptr; + clone->nested = nullptr; + + bsr_t brs = mvcc_bind_slot(env); + if (unlikely(brs.err != MDBX_SUCCESS)) { + osal_free(clone); + return LOG_IFERR(brs.err); + } + clone->to.reader = brs.rslot; + + const uint32_t snapshot_pages_used = src->to.reader ? atomic_load32(&src->to.reader->snapshot_pages_used, mo_Relaxed) + : (uint32_t)src->geo.first_unallocated; + const uint64_t snapshot_pages_retired = + src->to.reader ? atomic_load64(&src->to.reader->snapshot_pages_retired, mo_Relaxed) : 0; + + atomic_store32(&clone->to.reader->snapshot_pages_used, snapshot_pages_used, mo_Relaxed); + atomic_store64(&clone->to.reader->snapshot_pages_retired, snapshot_pages_retired, mo_Relaxed); + safe64_write(&clone->to.reader->txnid, src->txnid); + atomic_store32(&env->lck->rdt_refresh_flag, true, mo_AcquireRelease); + + clone->txnid = src->txnid; + clone->front_txnid = src->front_txnid; + clone->geo = src->geo; + clone->canary = src->canary; + clone->owner = (env->flags & MDBX_NOSTICKYTHREADS) ? 0 : osal_thread_self(); + + const size_t n_dbi = src->n_dbi; + clone->n_dbi = n_dbi; + memcpy(clone->dbs, src->dbs, n_dbi * sizeof(clone->dbs[0])); + memcpy(clone->dbi_seqs, src->dbi_seqs, n_dbi * sizeof(clone->dbi_seqs[0])); + memcpy(clone->dbi_state, src->dbi_state, n_dbi * sizeof(clone->dbi_state[0])); +#if MDBX_ENABLE_DBI_SPARSE + memcpy(clone->dbi_sparse, src->dbi_sparse, bitmap_bytes); +#endif /* MDBX_ENABLE_DBI_SPARSE */ + + memset(clone->cursors, 0, env->max_dbi * sizeof(clone->cursors[0])); + clone->userctx = nullptr; + +#if defined(_WIN32) || defined(_WIN64) + const size_t used_bytes = pgno2bytes(env, clone->geo.first_unallocated); + if (((used_bytes > env->geo_in_bytes.lower && env->geo_in_bytes.shrink) || + (globals.running_under_Wine && used_bytes < env->geo_in_bytes.upper && env->geo_in_bytes.grow)) && + (clone->flags & MDBX_NOSTICKYTHREADS) == 0) { + clone->flags |= txn_shrink_allowed; + imports.srwl_AcquireShared(&env->remap_guard); + } +#endif /* Windows */ + + dxb_sanitize_tail(env, clone); + clone->signature = txn_signature; + + DEBUG("clone txn %" PRIaTXN " %p from %p on env %p, root page %" PRIaPGNO "/%" PRIaPGNO, clone->txnid, (void *)clone, + (void *)src, (void *)env, clone->dbs[MAIN_DBI].root, clone->dbs[FREE_DBI].root); + + *out = clone; + return MDBX_SUCCESS; +} + int mdbx_txn_set_userctx(MDBX_txn *txn, void *ctx) { int rc = check_txn(txn, MDBX_TXN_FINISHED); if (unlikely(rc != MDBX_SUCCESS)) diff --git a/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.h b/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.h index 90835d1b9e..84a2097196 100644 --- a/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.h +++ b/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.h @@ -4403,6 +4403,49 @@ LIBMDBX_API int mdbx_txn_unpark(MDBX_txn *txn, bool restart_if_ousted); * \retval MDBX_EINVAL Transaction handle is NULL. */ LIBMDBX_API int mdbx_txn_renew(MDBX_txn *txn); +/** \brief Clone a read-only transaction. + * \ingroup c_transactions + * + * This creates a new read-only transaction that uses the same MVCC snapshot + * as the source transaction. The cloned transaction can be used independently + * and concurrently with the source transaction (but each transaction must + * still only be used by one thread at a time). + * + * This is useful for parallelizing read operations across multiple threads + * while ensuring all threads see a consistent view of the database. + * + * \note Only read-only transactions can be cloned. + * + * \note The cloned transaction must be aborted or committed independently + * of the source transaction. + * + * \note The source transaction must remain valid (not aborted, reset, or + * committed) for the duration of this call. + * + * \note In sticky-thread mode (the default), the cloned transaction is bound + * to the thread that calls this function. In \ref MDBX_NOSTICKYTHREADS mode, + * the cloned transaction can be used from any thread. + * + * \param [in] src A read-only transaction handle to clone. + * \param [out] out Address where the new \ref MDBX_txn handle will be stored. + * + * \returns A non-zero error value on failure and 0 on success, + * some possible errors are: + * \retval MDBX_PANIC A fatal error occurred earlier and the + * environment must be shut down. + * \retval MDBX_BAD_TXN The source transaction is not read-only, + * has already finished, is parked, or in error. + * \retval MDBX_EBADSIGN Transaction object has invalid signature. + * \retval MDBX_MVCC_RETARDED The MVCC snapshot used by source transaction + * has been reclaimed by a writer. + * \retval MDBX_READERS_FULL The reader lock table is full. + * See \ref mdbx_env_set_maxreaders(). + * \retval MDBX_EPERM Environment opened in exclusive mode + * (no reader table available). + * \retval MDBX_ENOMEM Out of memory. + * \retval MDBX_EINVAL An invalid parameter was specified. */ +LIBMDBX_API int mdbx_txn_clone(const MDBX_txn *src, MDBX_txn **out); + /** \brief The fours integers markers (aka "canary") associated with the * environment. * \ingroup c_crud diff --git a/crates/storage/libmdbx-rs/src/transaction.rs b/crates/storage/libmdbx-rs/src/transaction.rs index e47e71ac26..8e7f0d67cb 100644 --- a/crates/storage/libmdbx-rs/src/transaction.rs +++ b/crates/storage/libmdbx-rs/src/transaction.rs @@ -493,6 +493,23 @@ impl Transaction { Ok(()) } + + /// Creates a new read-only transaction that uses the same MVCC snapshot as this transaction. + /// + /// The cloned transaction can be used independently and concurrently with the source + /// transaction (but each transaction must still only be used by one thread at a time). + /// + /// This is useful for parallelizing read operations across multiple threads while ensuring + /// all threads see a consistent view of the database. + pub fn clone_txn(&self) -> Result { + self.txn_execute(|txn| { + let mut out: *mut ffi::MDBX_txn = ptr::null_mut(); + unsafe { + mdbx_result(ffi::mdbx_txn_clone(txn, &mut out))?; + Ok(Self::new_from_ptr(self.env().clone(), out)) + } + })? + } } impl Transaction { diff --git a/crates/storage/libmdbx-rs/tests/transaction.rs b/crates/storage/libmdbx-rs/tests/transaction.rs index c7e8e3fcd3..8e350a82e1 100644 --- a/crates/storage/libmdbx-rs/tests/transaction.rs +++ b/crates/storage/libmdbx-rs/tests/transaction.rs @@ -373,3 +373,137 @@ fn test_stat_dupsort() { assert_eq!(stat.entries(), 8); } } + +#[test] +fn test_clone_txn_same_snapshot() { + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = env.begin_rw_txn().unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap(); + txn.put(db.dbi(), b"key2", b"val2", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + + let txn1 = env.begin_ro_txn().unwrap(); + let txn1_id = txn1.id().unwrap(); + + let txn2 = txn1.clone_txn().unwrap(); + let txn2_id = txn2.id().unwrap(); + + assert_eq!(txn1_id, txn2_id, "cloned txn should have same txnid"); + + let db1 = txn1.open_db(None).unwrap(); + let db2 = txn2.open_db(None).unwrap(); + + assert_eq!(txn1.get::<[u8; 4]>(db1.dbi(), b"key1").unwrap(), Some(*b"val1")); + assert_eq!(txn2.get::<[u8; 4]>(db2.dbi(), b"key1").unwrap(), Some(*b"val1")); + assert_eq!(txn1.get::<[u8; 4]>(db1.dbi(), b"key2").unwrap(), Some(*b"val2")); + assert_eq!(txn2.get::<[u8; 4]>(db2.dbi(), b"key2").unwrap(), Some(*b"val2")); +} + +#[test] +fn test_clone_txn_independent_lifetime() { + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = env.begin_rw_txn().unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + + let txn1 = env.begin_ro_txn().unwrap(); + let txn2 = txn1.clone_txn().unwrap(); + let txn2_id = txn2.id().unwrap(); + + drop(txn1); + + let db2 = txn2.open_db(None).unwrap(); + assert_eq!(txn2.get::<[u8; 4]>(db2.dbi(), b"key1").unwrap(), Some(*b"val1")); + assert_eq!(txn2.id().unwrap(), txn2_id); +} + +#[test] +fn test_clone_txn_sees_same_snapshot_after_write() { + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = env.begin_rw_txn().unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + + let ro_txn = env.begin_ro_txn().unwrap(); + let cloned_txn = ro_txn.clone_txn().unwrap(); + + { + let write_txn = env.begin_rw_txn().unwrap(); + let db = write_txn.open_db(None).unwrap(); + write_txn.put(db.dbi(), b"key2", b"val2", WriteFlags::empty()).unwrap(); + write_txn.commit().unwrap(); + } + + let db1 = ro_txn.open_db(None).unwrap(); + let db2 = cloned_txn.open_db(None).unwrap(); + + assert_eq!(ro_txn.get::<()>(db1.dbi(), b"key2").unwrap(), None); + assert_eq!(cloned_txn.get::<()>(db2.dbi(), b"key2").unwrap(), None); + + let new_txn = env.begin_ro_txn().unwrap(); + let new_db = new_txn.open_db(None).unwrap(); + assert_eq!(new_txn.get::<[u8; 4]>(new_db.dbi(), b"key2").unwrap(), Some(*b"val2")); +} + +#[test] +fn test_clone_txn_parallel_reads() { + let dir = tempdir().unwrap(); + let env: Arc = Arc::new(Environment::builder().open(dir.path()).unwrap()); + + let txn = env.begin_rw_txn().unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + for i in 0..100 { + txn.put( + db.dbi(), + format!("key{i:02}").as_bytes(), + format!("val{i:02}").as_bytes(), + WriteFlags::empty(), + ) + .unwrap(); + } + txn.commit().unwrap(); + + let base_txn = env.begin_ro_txn().unwrap(); + let base_id = base_txn.id().unwrap(); + + let n = 4usize; + let barrier = Arc::new(Barrier::new(n)); + let mut handles: Vec> = Vec::with_capacity(n); + + for i in 0..n { + let cloned_txn = base_txn.clone_txn().unwrap(); + let thread_barrier = barrier.clone(); + + handles.push(thread::spawn(move || { + thread_barrier.wait(); + + let txn_id = cloned_txn.id().unwrap(); + let db = cloned_txn.open_db(None).unwrap(); + let mut count = 0usize; + + for j in (i * 25)..((i + 1) * 25) { + let val: Cow<'_, [u8]> = + cloned_txn.get(db.dbi(), format!("key{j:02}").as_bytes()).unwrap().unwrap(); + assert!(val.starts_with(b"val")); + count += 1; + } + + (txn_id, count) + })); + } + + for handle in handles { + let (txn_id, count) = handle.join().unwrap(); + assert_eq!(txn_id, base_id); + assert_eq!(count, 25); + } +}