mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-08 23:08:19 -05:00
wip
This commit is contained in:
119
crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c
vendored
119
crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c
vendored
@@ -12936,6 +12936,125 @@ int mdbx_txn_renew(MDBX_txn *txn) {
|
||||
return LOG_IFERR(rc);
|
||||
}
|
||||
|
||||
int mdbx_txn_clone(const MDBX_txn *src, MDBX_txn **dest) {
|
||||
if (unlikely(!dest))
|
||||
return LOG_IFERR(MDBX_EINVAL);
|
||||
*dest = nullptr;
|
||||
|
||||
int rc = check_txn(src, MDBX_TXN_BLOCKED);
|
||||
if (unlikely(rc != MDBX_SUCCESS))
|
||||
return LOG_IFERR(rc);
|
||||
|
||||
if (unlikely((src->flags & MDBX_TXN_RDONLY) == 0))
|
||||
return LOG_IFERR(MDBX_EINVAL);
|
||||
|
||||
MDBX_env *const env = src->env;
|
||||
rc = check_env(env, true);
|
||||
if (unlikely(rc != MDBX_SUCCESS))
|
||||
return LOG_IFERR(rc);
|
||||
|
||||
if (unlikely((env->flags & MDBX_NOSTICKYTHREADS) == 0))
|
||||
return LOG_IFERR(MDBX_TXN_OVERLAPPING);
|
||||
|
||||
MDBX_txn *txn = nullptr;
|
||||
const intptr_t bitmap_bytes =
|
||||
#if MDBX_ENABLE_DBI_SPARSE
|
||||
ceil_powerof2(env->max_dbi, CHAR_BIT * sizeof(txn->dbi_sparse[0])) / CHAR_BIT;
|
||||
#else
|
||||
0;
|
||||
#endif /* MDBX_ENABLE_DBI_SPARSE */
|
||||
|
||||
STATIC_ASSERT(sizeof(txn->tw) > sizeof(txn->to));
|
||||
const size_t base = sizeof(MDBX_txn) - sizeof(txn->tw) + sizeof(txn->to);
|
||||
const size_t size = base + (size_t)bitmap_bytes + env->max_dbi * sizeof(txn->dbi_seqs[0]) +
|
||||
env->max_dbi * (sizeof(txn->dbs[0]) + sizeof(txn->cursors[0]) + sizeof(txn->dbi_state[0]));
|
||||
|
||||
txn = osal_malloc(size);
|
||||
if (unlikely(txn == nullptr))
|
||||
return LOG_IFERR(MDBX_ENOMEM);
|
||||
#if MDBX_DEBUG
|
||||
memset(txn, 0xCD, size);
|
||||
VALGRIND_MAKE_MEM_UNDEFINED(txn, size);
|
||||
#endif /* MDBX_DEBUG */
|
||||
MDBX_ANALYSIS_ASSUME(size > base);
|
||||
memset(txn, 0, (MDBX_GOOFY_MSVC_STATIC_ANALYZER && base > size) ? size : base);
|
||||
txn->dbs = ptr_disp(txn, base);
|
||||
txn->cursors = ptr_disp(txn->dbs, env->max_dbi * sizeof(txn->dbs[0]));
|
||||
#if MDBX_DEBUG
|
||||
txn->cursors[FREE_DBI] = nullptr; /* avoid SIGSEGV in an assertion later */
|
||||
#endif
|
||||
txn->dbi_state = ptr_disp(txn, size - env->max_dbi * sizeof(txn->dbi_state[0]));
|
||||
txn->dbi_seqs = ptr_disp(txn->cursors, env->max_dbi * sizeof(txn->cursors[0]));
|
||||
#if MDBX_ENABLE_DBI_SPARSE
|
||||
txn->dbi_sparse = ptr_disp(txn->dbi_state, -bitmap_bytes);
|
||||
#endif /* MDBX_ENABLE_DBI_SPARSE */
|
||||
|
||||
txn->env = env;
|
||||
txn->flags = src->flags & ~txn_state_flags;
|
||||
txn->parent = nullptr;
|
||||
txn->nested = nullptr;
|
||||
|
||||
txn->txnid = src->txnid;
|
||||
txn->front_txnid = src->front_txnid;
|
||||
txn->geo = src->geo;
|
||||
txn->canary = src->canary;
|
||||
txn->owner = (env->flags & MDBX_NOSTICKYTHREADS) ? 0 : osal_thread_self();
|
||||
|
||||
if (unlikely(src->n_dbi > env->max_dbi)) {
|
||||
rc = MDBX_CORRUPTED;
|
||||
goto bailout;
|
||||
}
|
||||
|
||||
txn->n_dbi = src->n_dbi;
|
||||
memset(txn->cursors, 0, env->max_dbi * sizeof(txn->cursors[0]));
|
||||
memset(txn->dbi_state, 0, env->max_dbi * sizeof(txn->dbi_state[0]));
|
||||
memset(txn->dbi_seqs, 0, env->max_dbi * sizeof(txn->dbi_seqs[0]));
|
||||
#if MDBX_ENABLE_DBI_SPARSE
|
||||
if (bitmap_bytes)
|
||||
memset(txn->dbi_sparse, 0, bitmap_bytes);
|
||||
#endif /* MDBX_ENABLE_DBI_SPARSE */
|
||||
|
||||
memcpy(txn->dbs, src->dbs, txn->n_dbi * sizeof(txn->dbs[0]));
|
||||
memcpy(txn->dbi_state, src->dbi_state, txn->n_dbi * sizeof(txn->dbi_state[0]));
|
||||
memcpy(txn->dbi_seqs, src->dbi_seqs, txn->n_dbi * sizeof(txn->dbi_seqs[0]));
|
||||
#if MDBX_ENABLE_DBI_SPARSE
|
||||
if (bitmap_bytes)
|
||||
memcpy(txn->dbi_sparse, src->dbi_sparse, bitmap_bytes);
|
||||
#endif /* MDBX_ENABLE_DBI_SPARSE */
|
||||
|
||||
txn->to.reader = nullptr;
|
||||
if (env->lck_mmap.lck) {
|
||||
bsr_t brs = mvcc_bind_slot(env);
|
||||
if (unlikely(brs.err != MDBX_SUCCESS)) {
|
||||
rc = brs.err;
|
||||
goto bailout;
|
||||
}
|
||||
txn->to.reader = brs.rslot;
|
||||
safe64_reset(&txn->to.reader->txnid, true);
|
||||
if (src->to.reader) {
|
||||
atomic_store32(&txn->to.reader->snapshot_pages_used,
|
||||
atomic_load32(&src->to.reader->snapshot_pages_used, mo_Relaxed), mo_Relaxed);
|
||||
atomic_store64(&txn->to.reader->snapshot_pages_retired,
|
||||
atomic_load64(&src->to.reader->snapshot_pages_retired, mo_Relaxed), mo_Relaxed);
|
||||
} else {
|
||||
atomic_store32(&txn->to.reader->snapshot_pages_used, src->geo.first_unallocated, mo_Relaxed);
|
||||
atomic_store64(&txn->to.reader->snapshot_pages_retired, 0, mo_Relaxed);
|
||||
}
|
||||
safe64_write(&txn->to.reader->txnid, src->txnid);
|
||||
atomic_store32(&env->lck->rdt_refresh_flag, true, mo_AcquireRelease);
|
||||
}
|
||||
|
||||
txn->signature = txn_signature;
|
||||
txn->userctx = nullptr;
|
||||
*dest = txn;
|
||||
DEBUG("clone txn %" PRIaTXN "r %p from %p on env %p", txn->txnid, (void *)txn, (void *)src, (void *)env);
|
||||
return MDBX_SUCCESS;
|
||||
|
||||
bailout:
|
||||
osal_free(txn);
|
||||
return LOG_IFERR(rc);
|
||||
}
|
||||
|
||||
int mdbx_txn_set_userctx(MDBX_txn *txn, void *ctx) {
|
||||
int rc = check_txn(txn, MDBX_TXN_FINISHED);
|
||||
if (unlikely(rc != MDBX_SUCCESS))
|
||||
|
||||
@@ -3882,6 +3882,35 @@ MDBX_NOTHROW_PURE_FUNCTION LIBMDBX_API void *mdbx_env_get_userctx(const MDBX_env
|
||||
LIBMDBX_API int mdbx_txn_begin_ex(MDBX_env *env, MDBX_txn *parent, MDBX_txn_flags_t flags, MDBX_txn **txn,
|
||||
void *context);
|
||||
|
||||
/** \brief Clone a read-only transaction snapshot.
|
||||
* \ingroup c_transactions
|
||||
*
|
||||
* Creates a new read-only transaction that uses the same MVCC snapshot as
|
||||
* the \p src transaction. This allows parallel read operations across threads
|
||||
* without re-opening a read transaction and re-fetching state.
|
||||
*
|
||||
* \note This function requires \ref MDBX_NOSTICKYTHREADS (aka MDBX_NOTLS)
|
||||
* to be enabled for the environment. Otherwise it will return
|
||||
* \ref MDBX_TXN_OVERLAPPING.
|
||||
*
|
||||
* \note The \p src transaction must be an active read-only transaction.
|
||||
*
|
||||
* \note The \p src transaction and the cloned transaction must not be used
|
||||
* concurrently from multiple threads. Each transaction and its cursors must
|
||||
* be confined to a single thread at a time.
|
||||
*
|
||||
* \param [in] src A read-only transaction handle returned by
|
||||
* \ref mdbx_txn_begin_ex() or \ref mdbx_txn_begin().
|
||||
* \param [out] dest Address where the cloned \ref MDBX_txn handle will be
|
||||
* stored. Must not be NULL.
|
||||
*
|
||||
* \returns A non-zero error value on failure and 0 on success.
|
||||
* \retval MDBX_EINVAL Invalid arguments or \p src is not read-only.
|
||||
* \retval MDBX_TXN_OVERLAPPING \ref MDBX_NOSTICKYTHREADS is not enabled.
|
||||
* \retval MDBX_READERS_FULL Reader lock table is full.
|
||||
* \retval MDBX_ENOMEM Out of memory. */
|
||||
LIBMDBX_API int mdbx_txn_clone(const MDBX_txn *src, MDBX_txn **dest);
|
||||
|
||||
/** \brief Create a transaction for use with the environment.
|
||||
* \ingroup c_transactions
|
||||
*
|
||||
|
||||
@@ -483,6 +483,20 @@ impl Transaction<RW> {
|
||||
}
|
||||
|
||||
impl Transaction<RO> {
|
||||
/// Clones this read-only transaction, preserving the same MVCC snapshot.
|
||||
///
|
||||
/// This requires the environment to be opened with `MDBX_NOSTICKYTHREADS` (aka `MDBX_NOTLS`).
|
||||
/// The cloned transaction must not be used concurrently with this transaction from multiple
|
||||
/// threads.
|
||||
pub fn clone_snapshot(&self) -> Result<Self> {
|
||||
let cloned = self.txn_execute(|txn| {
|
||||
let mut cloned: *mut ffi::MDBX_txn = ptr::null_mut();
|
||||
mdbx_result(unsafe { ffi::mdbx_txn_clone(txn, &mut cloned) }).map(|_| cloned)
|
||||
})??;
|
||||
|
||||
Ok(Self::new_from_ptr(self.env().clone(), cloned))
|
||||
}
|
||||
|
||||
/// Closes the database handle.
|
||||
///
|
||||
/// # Safety
|
||||
|
||||
@@ -373,3 +373,35 @@ fn test_stat_dupsort() {
|
||||
assert_eq!(stat.entries(), 8);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_txn_clone_snapshot() {
|
||||
let dir = tempdir().unwrap();
|
||||
let env = Environment::builder().open(dir.path()).unwrap();
|
||||
{
|
||||
let txn = env.begin_rw_txn().unwrap();
|
||||
let db = txn.open_db(None).unwrap();
|
||||
txn.put(db.dbi(), b"k", b"v1", WriteFlags::empty()).unwrap();
|
||||
txn.commit().unwrap();
|
||||
}
|
||||
|
||||
let ro = env.begin_ro_txn().unwrap();
|
||||
let clone = ro.clone_snapshot().unwrap();
|
||||
|
||||
{
|
||||
let txn = env.begin_rw_txn().unwrap();
|
||||
let db = txn.open_db(None).unwrap();
|
||||
txn.put(db.dbi(), b"k", b"v2", WriteFlags::empty()).unwrap();
|
||||
txn.commit().unwrap();
|
||||
}
|
||||
|
||||
let db = ro.open_db(None).unwrap();
|
||||
assert_eq!(ro.get::<[u8; 2]>(db.dbi(), b"k").unwrap(), Some(*b"v1"));
|
||||
|
||||
let db = clone.open_db(None).unwrap();
|
||||
assert_eq!(clone.get::<[u8; 2]>(db.dbi(), b"k").unwrap(), Some(*b"v1"));
|
||||
|
||||
let ro2 = env.begin_ro_txn().unwrap();
|
||||
let db = ro2.open_db(None).unwrap();
|
||||
assert_eq!(ro2.get::<[u8; 2]>(db.dbi(), b"k").unwrap(), Some(*b"v2"));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user