wip: vibed tx clone fn

This commit is contained in:
Dan Cline
2025-12-18 19:07:00 -05:00
parent 1e38c7fea8
commit 299b7f1e74
4 changed files with 312 additions and 0 deletions

View File

@@ -12936,6 +12936,124 @@ int mdbx_txn_renew(MDBX_txn *txn) {
return LOG_IFERR(rc);
}
int mdbx_txn_clone(const MDBX_txn *src, MDBX_txn **out) {
if (unlikely(!out))
return LOG_IFERR(MDBX_EINVAL);
*out = nullptr;
if (unlikely(!src))
return LOG_IFERR(MDBX_EINVAL);
if (unlikely(src->signature != txn_signature))
return LOG_IFERR(MDBX_EBADSIGN);
if (unlikely((src->flags & MDBX_TXN_RDONLY) == 0))
return LOG_IFERR(MDBX_BAD_TXN);
if (unlikely(src->flags & (MDBX_TXN_FINISHED | MDBX_TXN_ERROR | MDBX_TXN_PARKED)))
return LOG_IFERR(MDBX_BAD_TXN);
MDBX_env *const env = src->env;
int rc = check_env(env, true);
if (unlikely(rc != MDBX_SUCCESS))
return LOG_IFERR(rc);
if (unlikely(!env->lck_mmap.lck))
return LOG_IFERR(MDBX_EPERM);
const txnid_t snap_oldest = atomic_load64(&env->lck->cached_oldest, mo_AcquireRelease);
if (unlikely(src->txnid < snap_oldest))
return LOG_IFERR(MDBX_MVCC_RETARDED);
const intptr_t bitmap_bytes =
#if MDBX_ENABLE_DBI_SPARSE
ceil_powerof2(env->max_dbi, CHAR_BIT * sizeof(src->dbi_sparse[0])) / CHAR_BIT;
#else
0;
#endif /* MDBX_ENABLE_DBI_SPARSE */
const size_t base = sizeof(MDBX_txn) - sizeof(src->tw) + sizeof(src->to);
const size_t size = base + (size_t)bitmap_bytes + env->max_dbi * sizeof(src->dbi_seqs[0]) +
env->max_dbi * (sizeof(src->dbs[0]) + sizeof(src->cursors[0]) + sizeof(src->dbi_state[0]));
MDBX_txn *clone = osal_malloc(size);
if (unlikely(clone == nullptr))
return LOG_IFERR(MDBX_ENOMEM);
#if MDBX_DEBUG
memset(clone, 0xCD, size);
VALGRIND_MAKE_MEM_UNDEFINED(clone, size);
#endif /* MDBX_DEBUG */
memset(clone, 0, base);
clone->dbs = ptr_disp(clone, base);
clone->cursors = ptr_disp(clone->dbs, env->max_dbi * sizeof(clone->dbs[0]));
clone->dbi_state = ptr_disp(clone, size - env->max_dbi * sizeof(clone->dbi_state[0]));
clone->dbi_seqs = ptr_disp(clone->cursors, env->max_dbi * sizeof(clone->cursors[0]));
#if MDBX_ENABLE_DBI_SPARSE
clone->dbi_sparse = ptr_disp(clone->dbi_state, -bitmap_bytes);
#endif /* MDBX_ENABLE_DBI_SPARSE */
clone->flags = src->flags;
clone->env = env;
clone->parent = nullptr;
clone->nested = nullptr;
bsr_t brs = mvcc_bind_slot(env);
if (unlikely(brs.err != MDBX_SUCCESS)) {
osal_free(clone);
return LOG_IFERR(brs.err);
}
clone->to.reader = brs.rslot;
const uint32_t snapshot_pages_used = src->to.reader ? atomic_load32(&src->to.reader->snapshot_pages_used, mo_Relaxed)
: (uint32_t)src->geo.first_unallocated;
const uint64_t snapshot_pages_retired =
src->to.reader ? atomic_load64(&src->to.reader->snapshot_pages_retired, mo_Relaxed) : 0;
atomic_store32(&clone->to.reader->snapshot_pages_used, snapshot_pages_used, mo_Relaxed);
atomic_store64(&clone->to.reader->snapshot_pages_retired, snapshot_pages_retired, mo_Relaxed);
safe64_write(&clone->to.reader->txnid, src->txnid);
atomic_store32(&env->lck->rdt_refresh_flag, true, mo_AcquireRelease);
clone->txnid = src->txnid;
clone->front_txnid = src->front_txnid;
clone->geo = src->geo;
clone->canary = src->canary;
clone->owner = (env->flags & MDBX_NOSTICKYTHREADS) ? 0 : osal_thread_self();
const size_t n_dbi = src->n_dbi;
clone->n_dbi = n_dbi;
memcpy(clone->dbs, src->dbs, n_dbi * sizeof(clone->dbs[0]));
memcpy(clone->dbi_seqs, src->dbi_seqs, n_dbi * sizeof(clone->dbi_seqs[0]));
memcpy(clone->dbi_state, src->dbi_state, n_dbi * sizeof(clone->dbi_state[0]));
#if MDBX_ENABLE_DBI_SPARSE
memcpy(clone->dbi_sparse, src->dbi_sparse, bitmap_bytes);
#endif /* MDBX_ENABLE_DBI_SPARSE */
memset(clone->cursors, 0, env->max_dbi * sizeof(clone->cursors[0]));
clone->userctx = nullptr;
#if defined(_WIN32) || defined(_WIN64)
const size_t used_bytes = pgno2bytes(env, clone->geo.first_unallocated);
if (((used_bytes > env->geo_in_bytes.lower && env->geo_in_bytes.shrink) ||
(globals.running_under_Wine && used_bytes < env->geo_in_bytes.upper && env->geo_in_bytes.grow)) &&
(clone->flags & MDBX_NOSTICKYTHREADS) == 0) {
clone->flags |= txn_shrink_allowed;
imports.srwl_AcquireShared(&env->remap_guard);
}
#endif /* Windows */
dxb_sanitize_tail(env, clone);
clone->signature = txn_signature;
DEBUG("clone txn %" PRIaTXN " %p from %p on env %p, root page %" PRIaPGNO "/%" PRIaPGNO, clone->txnid, (void *)clone,
(void *)src, (void *)env, clone->dbs[MAIN_DBI].root, clone->dbs[FREE_DBI].root);
*out = clone;
return MDBX_SUCCESS;
}
int mdbx_txn_set_userctx(MDBX_txn *txn, void *ctx) {
int rc = check_txn(txn, MDBX_TXN_FINISHED);
if (unlikely(rc != MDBX_SUCCESS))

View File

@@ -4403,6 +4403,49 @@ LIBMDBX_API int mdbx_txn_unpark(MDBX_txn *txn, bool restart_if_ousted);
* \retval MDBX_EINVAL Transaction handle is NULL. */
LIBMDBX_API int mdbx_txn_renew(MDBX_txn *txn);
/** \brief Clone a read-only transaction.
* \ingroup c_transactions
*
* This creates a new read-only transaction that uses the same MVCC snapshot
* as the source transaction. The cloned transaction can be used independently
* and concurrently with the source transaction (but each transaction must
* still only be used by one thread at a time).
*
* This is useful for parallelizing read operations across multiple threads
* while ensuring all threads see a consistent view of the database.
*
* \note Only read-only transactions can be cloned.
*
* \note The cloned transaction must be aborted or committed independently
* of the source transaction.
*
* \note The source transaction must remain valid (not aborted, reset, or
* committed) for the duration of this call.
*
* \note In sticky-thread mode (the default), the cloned transaction is bound
* to the thread that calls this function. In \ref MDBX_NOSTICKYTHREADS mode,
* the cloned transaction can be used from any thread.
*
* \param [in] src A read-only transaction handle to clone.
* \param [out] out Address where the new \ref MDBX_txn handle will be stored.
*
* \returns A non-zero error value on failure and 0 on success,
* some possible errors are:
* \retval MDBX_PANIC A fatal error occurred earlier and the
* environment must be shut down.
* \retval MDBX_BAD_TXN The source transaction is not read-only,
* has already finished, is parked, or in error.
* \retval MDBX_EBADSIGN Transaction object has invalid signature.
* \retval MDBX_MVCC_RETARDED The MVCC snapshot used by source transaction
* has been reclaimed by a writer.
* \retval MDBX_READERS_FULL The reader lock table is full.
* See \ref mdbx_env_set_maxreaders().
* \retval MDBX_EPERM Environment opened in exclusive mode
* (no reader table available).
* \retval MDBX_ENOMEM Out of memory.
* \retval MDBX_EINVAL An invalid parameter was specified. */
LIBMDBX_API int mdbx_txn_clone(const MDBX_txn *src, MDBX_txn **out);
/** \brief The fours integers markers (aka "canary") associated with the
* environment.
* \ingroup c_crud

View File

@@ -493,6 +493,23 @@ impl Transaction<RO> {
Ok(())
}
/// Creates a new read-only transaction that uses the same MVCC snapshot as this transaction.
///
/// The cloned transaction can be used independently and concurrently with the source
/// transaction (but each transaction must still only be used by one thread at a time).
///
/// This is useful for parallelizing read operations across multiple threads while ensuring
/// all threads see a consistent view of the database.
pub fn clone_txn(&self) -> Result<Self> {
self.txn_execute(|txn| {
let mut out: *mut ffi::MDBX_txn = ptr::null_mut();
unsafe {
mdbx_result(ffi::mdbx_txn_clone(txn, &mut out))?;
Ok(Self::new_from_ptr(self.env().clone(), out))
}
})?
}
}
impl Transaction<RW> {

View File

@@ -373,3 +373,137 @@ fn test_stat_dupsort() {
assert_eq!(stat.entries(), 8);
}
}
#[test]
fn test_clone_txn_same_snapshot() {
let dir = tempdir().unwrap();
let env = Environment::builder().open(dir.path()).unwrap();
let txn = env.begin_rw_txn().unwrap();
let db = txn.create_db(None, DatabaseFlags::empty()).unwrap();
txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.put(db.dbi(), b"key2", b"val2", WriteFlags::empty()).unwrap();
txn.commit().unwrap();
let txn1 = env.begin_ro_txn().unwrap();
let txn1_id = txn1.id().unwrap();
let txn2 = txn1.clone_txn().unwrap();
let txn2_id = txn2.id().unwrap();
assert_eq!(txn1_id, txn2_id, "cloned txn should have same txnid");
let db1 = txn1.open_db(None).unwrap();
let db2 = txn2.open_db(None).unwrap();
assert_eq!(txn1.get::<[u8; 4]>(db1.dbi(), b"key1").unwrap(), Some(*b"val1"));
assert_eq!(txn2.get::<[u8; 4]>(db2.dbi(), b"key1").unwrap(), Some(*b"val1"));
assert_eq!(txn1.get::<[u8; 4]>(db1.dbi(), b"key2").unwrap(), Some(*b"val2"));
assert_eq!(txn2.get::<[u8; 4]>(db2.dbi(), b"key2").unwrap(), Some(*b"val2"));
}
#[test]
fn test_clone_txn_independent_lifetime() {
let dir = tempdir().unwrap();
let env = Environment::builder().open(dir.path()).unwrap();
let txn = env.begin_rw_txn().unwrap();
let db = txn.create_db(None, DatabaseFlags::empty()).unwrap();
txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.commit().unwrap();
let txn1 = env.begin_ro_txn().unwrap();
let txn2 = txn1.clone_txn().unwrap();
let txn2_id = txn2.id().unwrap();
drop(txn1);
let db2 = txn2.open_db(None).unwrap();
assert_eq!(txn2.get::<[u8; 4]>(db2.dbi(), b"key1").unwrap(), Some(*b"val1"));
assert_eq!(txn2.id().unwrap(), txn2_id);
}
#[test]
fn test_clone_txn_sees_same_snapshot_after_write() {
let dir = tempdir().unwrap();
let env = Environment::builder().open(dir.path()).unwrap();
let txn = env.begin_rw_txn().unwrap();
let db = txn.create_db(None, DatabaseFlags::empty()).unwrap();
txn.put(db.dbi(), b"key1", b"val1", WriteFlags::empty()).unwrap();
txn.commit().unwrap();
let ro_txn = env.begin_ro_txn().unwrap();
let cloned_txn = ro_txn.clone_txn().unwrap();
{
let write_txn = env.begin_rw_txn().unwrap();
let db = write_txn.open_db(None).unwrap();
write_txn.put(db.dbi(), b"key2", b"val2", WriteFlags::empty()).unwrap();
write_txn.commit().unwrap();
}
let db1 = ro_txn.open_db(None).unwrap();
let db2 = cloned_txn.open_db(None).unwrap();
assert_eq!(ro_txn.get::<()>(db1.dbi(), b"key2").unwrap(), None);
assert_eq!(cloned_txn.get::<()>(db2.dbi(), b"key2").unwrap(), None);
let new_txn = env.begin_ro_txn().unwrap();
let new_db = new_txn.open_db(None).unwrap();
assert_eq!(new_txn.get::<[u8; 4]>(new_db.dbi(), b"key2").unwrap(), Some(*b"val2"));
}
#[test]
fn test_clone_txn_parallel_reads() {
let dir = tempdir().unwrap();
let env: Arc<Environment> = Arc::new(Environment::builder().open(dir.path()).unwrap());
let txn = env.begin_rw_txn().unwrap();
let db = txn.create_db(None, DatabaseFlags::empty()).unwrap();
for i in 0..100 {
txn.put(
db.dbi(),
format!("key{i:02}").as_bytes(),
format!("val{i:02}").as_bytes(),
WriteFlags::empty(),
)
.unwrap();
}
txn.commit().unwrap();
let base_txn = env.begin_ro_txn().unwrap();
let base_id = base_txn.id().unwrap();
let n = 4usize;
let barrier = Arc::new(Barrier::new(n));
let mut handles: Vec<JoinHandle<(u64, usize)>> = Vec::with_capacity(n);
for i in 0..n {
let cloned_txn = base_txn.clone_txn().unwrap();
let thread_barrier = barrier.clone();
handles.push(thread::spawn(move || {
thread_barrier.wait();
let txn_id = cloned_txn.id().unwrap();
let db = cloned_txn.open_db(None).unwrap();
let mut count = 0usize;
for j in (i * 25)..((i + 1) * 25) {
let val: Cow<'_, [u8]> =
cloned_txn.get(db.dbi(), format!("key{j:02}").as_bytes()).unwrap().unwrap();
assert!(val.starts_with(b"val"));
count += 1;
}
(txn_id, count)
}));
}
for handle in handles {
let (txn_id, count) = handle.join().unwrap();
assert_eq!(txn_id, base_id);
assert_eq!(count, 25);
}
}