mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
feat(mdbx): io_uring batch prefault for parallel subtxns
- Add io_uring support detection (MDBX_HAVE_IO_URING) for Linux - Implement subtxn_batch_prefault_arena() to prefault all arena pages at subtxn creation using io_uring batch writes - Single io_uring_submit() for N scattered pages vs N pwrite() syscalls - Add subtxn_arena_prefaulted flag to skip redundant mincore checks - Skip mincore_probe() in page_alloc_finalize for batch-prefaulted pages - Crash if io_uring unavailable (no fallback path) Performance: eliminates per-page mincore syscalls for initial arena, front-loads all prefaulting before parallel work begins. Amp-Thread-ID: https://ampcode.com/threads/T-019c2f31-1772-729b-8560-e02485715ed4 Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
110
crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c
vendored
110
crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c
vendored
@@ -436,6 +436,22 @@ __extern_C key_t ftok(const char *, int);
|
||||
#include <sys/time.h>
|
||||
#include <sys/uio.h>
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* io_uring support for batch prefaulting in parallel subtxns.
|
||||
* Required for parallel subtxn performance - no fallback path. */
|
||||
|
||||
#ifndef MDBX_HAVE_IO_URING
|
||||
#if (defined(__linux__) || defined(__gnu_linux__))
|
||||
#define MDBX_HAVE_IO_URING 1
|
||||
#else
|
||||
#define MDBX_HAVE_IO_URING 0
|
||||
#endif
|
||||
#endif /* MDBX_HAVE_IO_URING */
|
||||
|
||||
#if MDBX_HAVE_IO_URING
|
||||
#include <liburing.h>
|
||||
#endif /* MDBX_HAVE_IO_URING */
|
||||
|
||||
#endif /*---------------------------------------------------------------------*/
|
||||
|
||||
#if defined(__ANDROID_API__) || defined(ANDROID)
|
||||
@@ -4335,6 +4351,7 @@ struct MDBX_txn {
|
||||
size_t subtxn_arena_hint; /* Original arena hint for this subtxn */
|
||||
size_t subtxn_arena_page_allocations; /* Pages allocated from subtxn_repnl */
|
||||
size_t subtxn_arena_refill_events; /* Times fallback to parent was needed */
|
||||
bool subtxn_arena_prefaulted; /* true if initial arena pages were batch-prefaulted */
|
||||
osal_fastmutex_t *subtxn_alloc_mutex; /* Mutex for synchronized parent allocation */
|
||||
} tw;
|
||||
};
|
||||
@@ -15862,6 +15879,7 @@ static int create_subtxn_with_dbi(MDBX_txn *parent, MDBX_dbi dbi, MDBX_txn **sub
|
||||
txn->tw.subtxn_pages_from_eof = 0;
|
||||
txn->tw.subtxn_arena_page_allocations = 0;
|
||||
txn->tw.subtxn_arena_refill_events = 0;
|
||||
txn->tw.subtxn_arena_prefaulted = false;
|
||||
txn->tw.subtxn_alloc_mutex = parent->tw.subtxn_alloc_mutex; /* Share parent's mutex */
|
||||
|
||||
/* Insert at head of doubly-linked sibling list */
|
||||
@@ -15878,6 +15896,81 @@ static int create_subtxn_with_dbi(MDBX_txn *parent, MDBX_dbi dbi, MDBX_txn **sub
|
||||
return MDBX_SUCCESS;
|
||||
}
|
||||
|
||||
/* Batch prefault arena pages for all subtxns using io_uring.
|
||||
* Submits writes for all pages in all subtxns' repnl lists in one batch.
|
||||
* Requires io_uring - will abort if not available. */
|
||||
static void subtxn_batch_prefault_arena(MDBX_env *env, MDBX_txn **subtxns, size_t count) {
|
||||
#if !MDBX_HAVE_IO_URING
|
||||
(void)env; (void)subtxns; (void)count;
|
||||
FATAL("%s", "io_uring required for parallel subtxn prefault");
|
||||
abort();
|
||||
#else
|
||||
/* Count total pages across all subtxns */
|
||||
size_t total_pages = 0;
|
||||
for (size_t i = 0; i < count; ++i) {
|
||||
MDBX_txn *txn = subtxns[i];
|
||||
if (txn && txn->tw.subtxn_repnl)
|
||||
total_pages += MDBX_PNL_GETSIZE(txn->tw.subtxn_repnl);
|
||||
}
|
||||
|
||||
if (total_pages == 0)
|
||||
return;
|
||||
|
||||
/* Get dirty pattern buffer (offset by ps*2 from page_auxbuf) */
|
||||
void *const pattern = ptr_disp(env->page_auxbuf, env->ps * 2);
|
||||
|
||||
struct io_uring ring;
|
||||
if (io_uring_queue_init(total_pages, &ring, 0) < 0) {
|
||||
FATAL("io_uring_queue_init failed: %s", strerror(errno));
|
||||
abort();
|
||||
}
|
||||
|
||||
/* Submit write operations for all scattered pages */
|
||||
for (size_t i = 0; i < count; ++i) {
|
||||
MDBX_txn *txn = subtxns[i];
|
||||
if (!txn || !txn->tw.subtxn_repnl)
|
||||
continue;
|
||||
|
||||
const size_t npages = MDBX_PNL_GETSIZE(txn->tw.subtxn_repnl);
|
||||
for (size_t j = 0; j < npages; ++j) {
|
||||
pgno_t pgno = txn->tw.subtxn_repnl[j + 1]; /* PNL is 1-indexed */
|
||||
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
|
||||
if (unlikely(!sqe)) {
|
||||
/* SQ full, submit and continue */
|
||||
io_uring_submit(&ring);
|
||||
sqe = io_uring_get_sqe(&ring);
|
||||
if (!sqe)
|
||||
continue; /* Skip this page on failure */
|
||||
}
|
||||
io_uring_prep_write(sqe, env->lazy_fd, pattern, env->ps,
|
||||
pgno2bytes(env, pgno));
|
||||
}
|
||||
}
|
||||
|
||||
/* Submit all pending operations */
|
||||
io_uring_submit(&ring);
|
||||
|
||||
/* Wait for all completions */
|
||||
struct io_uring_cqe *cqe;
|
||||
for (size_t i = 0; i < total_pages; ++i) {
|
||||
if (io_uring_wait_cqe(&ring, &cqe) < 0)
|
||||
break;
|
||||
io_uring_cqe_seen(&ring, cqe);
|
||||
}
|
||||
|
||||
io_uring_queue_exit(&ring);
|
||||
|
||||
for (size_t i = 0; i < count; ++i) {
|
||||
if (subtxns[i])
|
||||
subtxns[i]->tw.subtxn_arena_prefaulted = true;
|
||||
}
|
||||
|
||||
#if MDBX_ENABLE_PGOP_STAT
|
||||
env->lck->pgops.prefault.weak += total_pages;
|
||||
#endif /* MDBX_ENABLE_PGOP_STAT */
|
||||
#endif /* MDBX_HAVE_IO_URING */
|
||||
}
|
||||
|
||||
/* mdbx_txn_create_subtxns - Create parallel sibling subtransactions.
|
||||
*
|
||||
* INVARIANTS:
|
||||
@@ -16163,6 +16256,7 @@ LIBMDBX_API int mdbx_txn_create_subtxns(MDBX_txn *parent,
|
||||
subtxns[i]->tw.subtxn_pages_from_eof = 0;
|
||||
subtxns[i]->tw.subtxn_arena_page_allocations = 0;
|
||||
subtxns[i]->tw.subtxn_arena_refill_events = 0;
|
||||
subtxns[i]->tw.subtxn_arena_prefaulted = false;
|
||||
DEBUG("subtxn %zu: assigned %zu GC pages (initial_pages=%zu)",
|
||||
i, MDBX_PNL_GETSIZE(subtxn_pnl), subtxns[i]->tw.subtxn_arena_initial_pages);
|
||||
}
|
||||
@@ -16180,6 +16274,13 @@ LIBMDBX_API int mdbx_txn_create_subtxns(MDBX_txn *parent,
|
||||
}
|
||||
|
||||
osal_free(gc_alloc);
|
||||
|
||||
/* Batch prefault all arena pages before subtxns start writing.
|
||||
* This front-loads page residency, eliminating per-page mincore checks
|
||||
* during gc_alloc_single. Only prefault if activated for this txn. */
|
||||
if (parent->tw.prefault_write_activated)
|
||||
subtxn_batch_prefault_arena(parent->env, subtxns, count);
|
||||
|
||||
return MDBX_SUCCESS;
|
||||
}
|
||||
|
||||
@@ -23388,6 +23489,14 @@ static inline pgr_t page_alloc_finalize(MDBX_env *const env, MDBX_txn *const txn
|
||||
* грязной I/O очереди. Из-за этого штраф за лишнюю запись может быть
|
||||
* сравним с избегаемым ненужным чтением. */
|
||||
if (txn->tw.prefault_write_activated) {
|
||||
/* For subtxns with batch-prefaulted arenas, skip per-page mincore
|
||||
* for pages from the initial distribution (not refills) */
|
||||
if ((txn->flags & txn_parallel_subtx) &&
|
||||
txn->tw.subtxn_arena_prefaulted &&
|
||||
txn->tw.subtxn_arena_page_allocations < txn->tw.subtxn_arena_initial_pages) {
|
||||
need_clean = false;
|
||||
goto skip_prefault;
|
||||
}
|
||||
void *const auxbuf = (txn->flags & txn_parallel_subtx) && txn->tw.txn_page_auxbuf
|
||||
? txn->tw.txn_page_auxbuf
|
||||
: env->page_auxbuf;
|
||||
@@ -23428,6 +23537,7 @@ static inline pgr_t page_alloc_finalize(MDBX_env *const env, MDBX_txn *const txn
|
||||
if (cleared == num)
|
||||
need_clean = false;
|
||||
}
|
||||
skip_prefault:;
|
||||
}
|
||||
} else {
|
||||
ret.page = page_shadow_alloc(txn, num);
|
||||
|
||||
Reference in New Issue
Block a user