feat(mdbx): per-subtxn io_uring prefault instead of batch

- Each subtxn now prefaults its OWN arena via io_uring in parallel
- 7 subtxn threads = 7 io_uring rings = true parallel I/O
- Prefault called at start of each write_*_only method
- Removes blocking batch prefault from mdbx_txn_create_subtxns
- Soft fail on io_uring init (optimization, not required)

Before: main thread prefaults all 30K pages serially, then spawns threads
After: each thread prefaults its ~4K pages in parallel with siblings
This commit is contained in:
joshieDo
2026-02-05 19:56:28 +00:00
parent f0882a5f8b
commit 6bb0ac111d
6 changed files with 131 additions and 62 deletions

View File

@@ -153,4 +153,15 @@ pub trait DbTxMut: Send {
/// Tracks whether arena hint estimation is working or always hitting floor/cap.
/// This is a no-op by default; implementations may override to record metrics.
fn record_arena_estimation(&self, _table: &'static str, _stats: &ArenaHintEstimationStats) {}
/// Prefaults the arena for the subtransaction bound to the given table.
///
/// Call this at the start of subtxn work to overlap I/O with sibling subtxns.
/// Each subtxn thread should call this - they prefault in parallel via `io_uring`.
///
/// Returns `Ok(true)` if prefault was performed, `Ok(false)` if no subtxn exists for
/// this table (e.g., parallel writes not enabled).
fn prefault_arena_for_table<T: Table>(&self) -> Result<bool, DatabaseError> {
Ok(false)
}
}

View File

@@ -568,6 +568,18 @@ impl Tx<RW> {
handler.env_metrics.record_arena_estimation(table, stats);
}
}
/// Prefaults the arena for the subtransaction bound to the given table using `io_uring`.
///
/// Call this at the start of subtxn work to overlap I/O with sibling subtxns.
/// Each subtxn thread should call this - they prefault in parallel.
///
/// Returns `Ok(true)` if prefault was performed, `Ok(false)` if no subtxn exists for
/// this table (e.g., parallel writes not enabled or table not found).
pub fn prefault_arena_for_table<T: Table>(&self) -> Result<bool, DatabaseError> {
let dbi = self.get_dbi::<T>()?;
self.inner.prefault_arena_for_dbi(dbi).map_err(|e| DatabaseError::InitCursor(e.into()))
}
}
impl DbTxMut for Tx<RW> {
@@ -664,6 +676,10 @@ impl DbTxMut for Tx<RW> {
) {
Tx::record_arena_estimation(self, table, stats)
}
fn prefault_arena_for_table<T: Table>(&self) -> Result<bool, DatabaseError> {
Tx::prefault_arena_for_table::<T>(self)
}
}
#[cfg(test)]

View File

@@ -15896,88 +15896,83 @@ static int create_subtxn_with_dbi(MDBX_txn *parent, MDBX_dbi dbi, MDBX_txn **sub
return MDBX_SUCCESS;
}
/* Batch prefault arena pages for all subtxns using io_uring.
* Submits writes for all pages in all subtxns' repnl lists in one batch.
* Requires io_uring - will abort if not available. */
static void subtxn_batch_prefault_arena(MDBX_env *env, MDBX_txn **subtxns, size_t count) {
#if !MDBX_HAVE_IO_URING
(void)env; (void)subtxns; (void)count;
FATAL("%s", "io_uring required for parallel subtxn prefault");
abort();
#else
/* Count total pages across all subtxns */
size_t total_pages = 0;
for (size_t i = 0; i < count; ++i) {
MDBX_txn *txn = subtxns[i];
if (txn && txn->tw.subtxn_repnl)
total_pages += MDBX_PNL_GETSIZE(txn->tw.subtxn_repnl);
}
if (total_pages == 0)
/* Prefault this subtxn's arena pages using io_uring.
* Called by each subtxn thread at the start of its work.
* Each thread has its own io_uring ring - true parallel I/O. */
static void subtxn_prefault_own_arena(MDBX_txn *subtxn) {
if (!(subtxn->flags & txn_parallel_subtx))
return;
if (!subtxn->tw.subtxn_repnl)
return;
DEBUG("io_uring batch prefault: %zu pages across %zu subtxns", total_pages, count);
MDBX_env *env = subtxn->env;
if (!subtxn->tw.prefault_write_activated)
return;
const size_t npages = MDBX_PNL_GETSIZE(subtxn->tw.subtxn_repnl);
if (npages == 0)
return;
#if !MDBX_HAVE_IO_URING
return; /* Silently skip on non-Linux */
#else
DEBUG("io_uring per-subtxn prefault: %zu pages for subtxn %p", npages, (void *)subtxn);
/* Get dirty pattern buffer (offset by ps*2 from page_auxbuf) */
void *const pattern = ptr_disp(env->page_auxbuf, env->ps * 2);
/* Cap ring size - kernel has limits, we'll submit in batches if needed */
const size_t ring_size = total_pages < 32768 ? total_pages : 32768;
const size_t ring_size = npages < 32768 ? npages : 32768;
struct io_uring ring;
int ret = io_uring_queue_init(ring_size, &ring, 0);
if (ret < 0) {
FATAL("io_uring_queue_init(%zu) failed: %s (total_pages=%zu)",
ring_size, strerror(-ret), total_pages);
abort();
}
if (ret < 0)
return; /* Soft fail - prefetch is optimization */
/* Submit write operations for all scattered pages */
for (size_t i = 0; i < count; ++i) {
MDBX_txn *txn = subtxns[i];
if (!txn || !txn->tw.subtxn_repnl)
continue;
const size_t npages = MDBX_PNL_GETSIZE(txn->tw.subtxn_repnl);
for (size_t j = 0; j < npages; ++j) {
pgno_t pgno = txn->tw.subtxn_repnl[j + 1]; /* PNL is 1-indexed */
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
if (unlikely(!sqe)) {
/* SQ full, submit and continue */
io_uring_submit(&ring);
sqe = io_uring_get_sqe(&ring);
if (!sqe)
continue; /* Skip this page on failure */
}
io_uring_prep_write(sqe, env->lazy_fd, pattern, env->ps,
pgno2bytes(env, pgno));
for (size_t i = 0; i < npages; ++i) {
pgno_t pgno = subtxn->tw.subtxn_repnl[i + 1];
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
if (unlikely(!sqe)) {
io_uring_submit(&ring);
sqe = io_uring_get_sqe(&ring);
if (!sqe)
continue;
}
io_uring_prep_write(sqe, env->lazy_fd, pattern, env->ps,
pgno2bytes(env, pgno));
}
/* Submit all pending operations */
io_uring_submit(&ring);
/* Wait for all completions */
struct io_uring_cqe *cqe;
for (size_t i = 0; i < total_pages; ++i) {
for (size_t i = 0; i < npages; ++i) {
if (io_uring_wait_cqe(&ring, &cqe) < 0)
break;
io_uring_cqe_seen(&ring, cqe);
}
io_uring_queue_exit(&ring);
for (size_t i = 0; i < count; ++i) {
if (subtxns[i])
subtxns[i]->tw.subtxn_arena_prefaulted = true;
}
subtxn->tw.subtxn_arena_prefaulted = true;
#if MDBX_ENABLE_PGOP_STAT
env->lck->pgops.prefault.weak += total_pages;
#endif /* MDBX_ENABLE_PGOP_STAT */
env->lck->pgops.prefault.weak += npages;
#endif
#endif /* MDBX_HAVE_IO_URING */
}
/* Public API for Rust to trigger per-subtxn arena prefault.
* Call this at the start of each subtxn thread's work. */
LIBMDBX_API int mdbx_subtxn_prefault_arena(MDBX_txn *txn) {
if (unlikely(!txn))
return MDBX_EINVAL;
if (unlikely(txn->signature != txn_signature))
return MDBX_EBADSIGN;
if (!(txn->flags & txn_parallel_subtx))
return MDBX_EINVAL;
if (txn->tw.subtxn_arena_prefaulted)
return MDBX_SUCCESS; /* Already done */
subtxn_prefault_own_arena(txn);
return MDBX_SUCCESS;
}
/* mdbx_txn_create_subtxns - Create parallel sibling subtransactions.
*
* INVARIANTS:
@@ -16282,11 +16277,9 @@ LIBMDBX_API int mdbx_txn_create_subtxns(MDBX_txn *parent,
osal_free(gc_alloc);
/* Batch prefault all arena pages before subtxns start writing.
* This front-loads page residency, eliminating per-page mincore checks
* during gc_alloc_single. Only prefault if activated for this txn. */
if (parent->tw.prefault_write_activated)
subtxn_batch_prefault_arena(parent->env, subtxns, count);
/* Arena prefault is now per-subtxn via mdbx_subtxn_prefault_arena().
* Rust calls this at the start of each subtxn thread's work,
* enabling true parallel I/O with per-thread io_uring rings. */
return MDBX_SUCCESS;
}

View File

@@ -6771,6 +6771,16 @@ typedef struct MDBX_subtxn_stats {
*/
LIBMDBX_API int mdbx_subtxn_get_stats(const MDBX_txn *subtxn, MDBX_subtxn_stats *stats);
/** \brief Prefault this subtxn's arena pages using io_uring.
*
* Call this at the start of subtxn work to overlap I/O with sibling subtxns.
* Each subtxn thread should call this - they prefault in parallel.
*
* \param [in] txn A valid subtransaction handle.
* \returns A non-zero error value on failure and 0 on success.
*/
LIBMDBX_API int mdbx_subtxn_prefault_arena(MDBX_txn *txn);
/** end of c_parallel @} */
/** end of c_api @} */

View File

@@ -162,6 +162,17 @@ impl SubTransaction {
})
})?
}
/// Prefault this subtxn's arena pages using `io_uring`.
///
/// Call this at the start of subtxn work to overlap I/O with sibling subtxns.
/// Each subtxn thread should call this - they prefault in parallel.
pub fn prefault_arena(&self) -> Result<()> {
self.txn_ptr.txn_execute_fail_on_timeout(|ptr| {
mdbx_result(unsafe { ffi::mdbx_subtxn_prefault_arena(ptr) })
})??;
Ok(())
}
}
impl<K> Transaction<K>
@@ -877,6 +888,27 @@ impl Transaction<RW> {
self.inner.txn.clone()
}
/// Prefaults the arena for the subtransaction bound to the given DBI using `io_uring`.
///
/// Call this at the start of subtxn work to overlap I/O with sibling subtxns.
/// Each subtxn thread should call this - they prefault in parallel.
///
/// Returns `Ok(true)` if prefault was performed, `Ok(false)` if no subtxn exists for
/// this DBI (e.g., parallel writes not enabled).
pub fn prefault_arena_for_dbi(&self, dbi: ffi::MDBX_dbi) -> Result<bool> {
if !self.parallel_writes_enabled.load(std::sync::atomic::Ordering::SeqCst) {
return Ok(false);
}
let subtxns = self.subtxns.read();
if let Some(subtxn) = subtxns.get(&dbi) {
subtxn.prefault_arena()?;
Ok(true)
} else {
Ok(false)
}
}
/// Aborts all subtransactions.
///
/// This discards all changes made through subtransactions.

View File

@@ -652,6 +652,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
) -> ProviderResult<Duration> {
let start = Instant::now();
self.tx_ref().prefault_arena_for_table::<tables::PlainAccountState>()?;
let mut cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
for (address, account) in accounts {
if let Some(account) = account {
@@ -671,6 +672,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
pub fn write_bytecodes_only(&self, contracts: &[(B256, Bytecode)]) -> ProviderResult<Duration> {
let start = Instant::now();
self.tx_ref().prefault_arena_for_table::<tables::Bytecodes>()?;
let mut cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
for (hash, bytecode) in contracts {
cursor.upsert(*hash, bytecode)?;
@@ -689,6 +691,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
) -> ProviderResult<Duration> {
let start = Instant::now();
self.tx_ref().prefault_arena_for_table::<tables::PlainStorageState>()?;
let mut cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
for PreparedStorageWrite { address, wipe_storage, storage } in storage {
// Wipe storage if flagged
@@ -723,6 +726,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
) -> ProviderResult<Duration> {
let start = Instant::now();
self.tx_ref().prefault_arena_for_table::<tables::HashedAccounts>()?;
let mut cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
for (hashed_address, account) in accounts {
if let Some(account) = account {
@@ -746,6 +750,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
) -> ProviderResult<Duration> {
let start = Instant::now();
self.tx_ref().prefault_arena_for_table::<tables::HashedStorages>()?;
let mut cursor = self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
for (hashed_address, storage) in storages {
// Wipe storage if flagged
@@ -783,6 +788,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
let start = Instant::now();
let mut num_entries = 0;
self.tx_ref().prefault_arena_for_table::<tables::AccountsTrie>()?;
let mut cursor = self.tx_ref().cursor_write::<tables::AccountsTrie>()?;
for (key, updated_node) in nodes {
let nibbles = StoredNibbles(*key);
@@ -818,6 +824,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
let mut num_entries = 0;
let mut total_op_counts = StorageTrieOpCounts::default();
self.tx_ref().prefault_arena_for_table::<tables::StoragesTrie>()?;
let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
for (hashed_address, storage_trie_updates) in tries {
let mut db_storage_trie_cursor =