feat(mdbx): io_uring batch prefault for parallel subtxns

- Add io_uring support detection (MDBX_HAVE_IO_URING) for Linux
- Implement subtxn_batch_prefault_arena() to prefault all arena pages
  at subtxn creation using io_uring batch writes
- Single io_uring_submit() for N scattered pages vs N pwrite() syscalls
- Add subtxn_arena_prefaulted flag to skip redundant mincore checks
- Skip mincore_probe() in page_alloc_finalize for batch-prefaulted pages
- Crash if io_uring unavailable (no fallback path)

Performance: eliminates per-page mincore syscalls for initial arena,
front-loads all prefaulting before parallel work begins.

Amp-Thread-ID: https://ampcode.com/threads/T-019c2f31-1772-729b-8560-e02485715ed4
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
joshieDo
2026-02-05 19:19:55 +00:00
parent a31f99c90a
commit 11f2efb2a2

View File

@@ -436,6 +436,22 @@ __extern_C key_t ftok(const char *, int);
#include <sys/time.h>
#include <sys/uio.h>
/*----------------------------------------------------------------------------*/
/* io_uring support for batch prefaulting in parallel subtxns.
* Required for parallel subtxn performance - no fallback path. */
#ifndef MDBX_HAVE_IO_URING
#if (defined(__linux__) || defined(__gnu_linux__))
#define MDBX_HAVE_IO_URING 1
#else
#define MDBX_HAVE_IO_URING 0
#endif
#endif /* MDBX_HAVE_IO_URING */
#if MDBX_HAVE_IO_URING
#include <liburing.h>
#endif /* MDBX_HAVE_IO_URING */
#endif /*---------------------------------------------------------------------*/
#if defined(__ANDROID_API__) || defined(ANDROID)
@@ -4335,6 +4351,7 @@ struct MDBX_txn {
size_t subtxn_arena_hint; /* Original arena hint for this subtxn */
size_t subtxn_arena_page_allocations; /* Pages allocated from subtxn_repnl */
size_t subtxn_arena_refill_events; /* Times fallback to parent was needed */
bool subtxn_arena_prefaulted; /* true if initial arena pages were batch-prefaulted */
osal_fastmutex_t *subtxn_alloc_mutex; /* Mutex for synchronized parent allocation */
} tw;
};
@@ -15862,6 +15879,7 @@ static int create_subtxn_with_dbi(MDBX_txn *parent, MDBX_dbi dbi, MDBX_txn **sub
txn->tw.subtxn_pages_from_eof = 0;
txn->tw.subtxn_arena_page_allocations = 0;
txn->tw.subtxn_arena_refill_events = 0;
txn->tw.subtxn_arena_prefaulted = false;
txn->tw.subtxn_alloc_mutex = parent->tw.subtxn_alloc_mutex; /* Share parent's mutex */
/* Insert at head of doubly-linked sibling list */
@@ -15878,6 +15896,81 @@ static int create_subtxn_with_dbi(MDBX_txn *parent, MDBX_dbi dbi, MDBX_txn **sub
return MDBX_SUCCESS;
}
/* Batch prefault arena pages for all subtxns using io_uring.
* Submits writes for all pages in all subtxns' repnl lists in one batch.
* Requires io_uring - will abort if not available. */
static void subtxn_batch_prefault_arena(MDBX_env *env, MDBX_txn **subtxns, size_t count) {
#if !MDBX_HAVE_IO_URING
(void)env; (void)subtxns; (void)count;
FATAL("%s", "io_uring required for parallel subtxn prefault");
abort();
#else
/* Count total pages across all subtxns */
size_t total_pages = 0;
for (size_t i = 0; i < count; ++i) {
MDBX_txn *txn = subtxns[i];
if (txn && txn->tw.subtxn_repnl)
total_pages += MDBX_PNL_GETSIZE(txn->tw.subtxn_repnl);
}
if (total_pages == 0)
return;
/* Get dirty pattern buffer (offset by ps*2 from page_auxbuf) */
void *const pattern = ptr_disp(env->page_auxbuf, env->ps * 2);
struct io_uring ring;
if (io_uring_queue_init(total_pages, &ring, 0) < 0) {
FATAL("io_uring_queue_init failed: %s", strerror(errno));
abort();
}
/* Submit write operations for all scattered pages */
for (size_t i = 0; i < count; ++i) {
MDBX_txn *txn = subtxns[i];
if (!txn || !txn->tw.subtxn_repnl)
continue;
const size_t npages = MDBX_PNL_GETSIZE(txn->tw.subtxn_repnl);
for (size_t j = 0; j < npages; ++j) {
pgno_t pgno = txn->tw.subtxn_repnl[j + 1]; /* PNL is 1-indexed */
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
if (unlikely(!sqe)) {
/* SQ full, submit and continue */
io_uring_submit(&ring);
sqe = io_uring_get_sqe(&ring);
if (!sqe)
continue; /* Skip this page on failure */
}
io_uring_prep_write(sqe, env->lazy_fd, pattern, env->ps,
pgno2bytes(env, pgno));
}
}
/* Submit all pending operations */
io_uring_submit(&ring);
/* Wait for all completions */
struct io_uring_cqe *cqe;
for (size_t i = 0; i < total_pages; ++i) {
if (io_uring_wait_cqe(&ring, &cqe) < 0)
break;
io_uring_cqe_seen(&ring, cqe);
}
io_uring_queue_exit(&ring);
for (size_t i = 0; i < count; ++i) {
if (subtxns[i])
subtxns[i]->tw.subtxn_arena_prefaulted = true;
}
#if MDBX_ENABLE_PGOP_STAT
env->lck->pgops.prefault.weak += total_pages;
#endif /* MDBX_ENABLE_PGOP_STAT */
#endif /* MDBX_HAVE_IO_URING */
}
/* mdbx_txn_create_subtxns - Create parallel sibling subtransactions.
*
* INVARIANTS:
@@ -16163,6 +16256,7 @@ LIBMDBX_API int mdbx_txn_create_subtxns(MDBX_txn *parent,
subtxns[i]->tw.subtxn_pages_from_eof = 0;
subtxns[i]->tw.subtxn_arena_page_allocations = 0;
subtxns[i]->tw.subtxn_arena_refill_events = 0;
subtxns[i]->tw.subtxn_arena_prefaulted = false;
DEBUG("subtxn %zu: assigned %zu GC pages (initial_pages=%zu)",
i, MDBX_PNL_GETSIZE(subtxn_pnl), subtxns[i]->tw.subtxn_arena_initial_pages);
}
@@ -16180,6 +16274,13 @@ LIBMDBX_API int mdbx_txn_create_subtxns(MDBX_txn *parent,
}
osal_free(gc_alloc);
/* Batch prefault all arena pages before subtxns start writing.
* This front-loads page residency, eliminating per-page mincore checks
* during gc_alloc_single. Only prefault if activated for this txn. */
if (parent->tw.prefault_write_activated)
subtxn_batch_prefault_arena(parent->env, subtxns, count);
return MDBX_SUCCESS;
}
@@ -23388,6 +23489,14 @@ static inline pgr_t page_alloc_finalize(MDBX_env *const env, MDBX_txn *const txn
* грязной I/O очереди. Из-за этого штраф за лишнюю запись может быть
* сравним с избегаемым ненужным чтением. */
if (txn->tw.prefault_write_activated) {
/* For subtxns with batch-prefaulted arenas, skip per-page mincore
* for pages from the initial distribution (not refills) */
if ((txn->flags & txn_parallel_subtx) &&
txn->tw.subtxn_arena_prefaulted &&
txn->tw.subtxn_arena_page_allocations < txn->tw.subtxn_arena_initial_pages) {
need_clean = false;
goto skip_prefault;
}
void *const auxbuf = (txn->flags & txn_parallel_subtx) && txn->tw.txn_page_auxbuf
? txn->tw.txn_page_auxbuf
: env->page_auxbuf;
@@ -23428,6 +23537,7 @@ static inline pgr_t page_alloc_finalize(MDBX_env *const env, MDBX_txn *const txn
if (cleared == num)
need_clean = false;
}
skip_prefault:;
}
} else {
ret.page = page_shadow_alloc(txn, num);