mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
feat(mdbx): per-subtxn io_uring prefault instead of batch
- Each subtxn now prefaults its OWN arena via io_uring in parallel - 7 subtxn threads = 7 io_uring rings = true parallel I/O - Prefault called at start of each write_*_only method - Removes blocking batch prefault from mdbx_txn_create_subtxns - Soft fail on io_uring init (optimization, not required) Before: main thread prefaults all 30K pages serially, then spawns threads After: each thread prefaults its ~4K pages in parallel with siblings
This commit is contained in:
@@ -153,4 +153,15 @@ pub trait DbTxMut: Send {
|
||||
/// Tracks whether arena hint estimation is working or always hitting floor/cap.
|
||||
/// This is a no-op by default; implementations may override to record metrics.
|
||||
fn record_arena_estimation(&self, _table: &'static str, _stats: &ArenaHintEstimationStats) {}
|
||||
|
||||
/// Prefaults the arena for the subtransaction bound to the given table.
|
||||
///
|
||||
/// Call this at the start of subtxn work to overlap I/O with sibling subtxns.
|
||||
/// Each subtxn thread should call this - they prefault in parallel via `io_uring`.
|
||||
///
|
||||
/// Returns `Ok(true)` if prefault was performed, `Ok(false)` if no subtxn exists for
|
||||
/// this table (e.g., parallel writes not enabled).
|
||||
fn prefault_arena_for_table<T: Table>(&self) -> Result<bool, DatabaseError> {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -568,6 +568,18 @@ impl Tx<RW> {
|
||||
handler.env_metrics.record_arena_estimation(table, stats);
|
||||
}
|
||||
}
|
||||
|
||||
/// Prefaults the arena for the subtransaction bound to the given table using `io_uring`.
|
||||
///
|
||||
/// Call this at the start of subtxn work to overlap I/O with sibling subtxns.
|
||||
/// Each subtxn thread should call this - they prefault in parallel.
|
||||
///
|
||||
/// Returns `Ok(true)` if prefault was performed, `Ok(false)` if no subtxn exists for
|
||||
/// this table (e.g., parallel writes not enabled or table not found).
|
||||
pub fn prefault_arena_for_table<T: Table>(&self) -> Result<bool, DatabaseError> {
|
||||
let dbi = self.get_dbi::<T>()?;
|
||||
self.inner.prefault_arena_for_dbi(dbi).map_err(|e| DatabaseError::InitCursor(e.into()))
|
||||
}
|
||||
}
|
||||
|
||||
impl DbTxMut for Tx<RW> {
|
||||
@@ -664,6 +676,10 @@ impl DbTxMut for Tx<RW> {
|
||||
) {
|
||||
Tx::record_arena_estimation(self, table, stats)
|
||||
}
|
||||
|
||||
fn prefault_arena_for_table<T: Table>(&self) -> Result<bool, DatabaseError> {
|
||||
Tx::prefault_arena_for_table::<T>(self)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
117
crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c
vendored
117
crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c
vendored
@@ -15896,88 +15896,83 @@ static int create_subtxn_with_dbi(MDBX_txn *parent, MDBX_dbi dbi, MDBX_txn **sub
|
||||
return MDBX_SUCCESS;
|
||||
}
|
||||
|
||||
/* Batch prefault arena pages for all subtxns using io_uring.
|
||||
* Submits writes for all pages in all subtxns' repnl lists in one batch.
|
||||
* Requires io_uring - will abort if not available. */
|
||||
static void subtxn_batch_prefault_arena(MDBX_env *env, MDBX_txn **subtxns, size_t count) {
|
||||
#if !MDBX_HAVE_IO_URING
|
||||
(void)env; (void)subtxns; (void)count;
|
||||
FATAL("%s", "io_uring required for parallel subtxn prefault");
|
||||
abort();
|
||||
#else
|
||||
/* Count total pages across all subtxns */
|
||||
size_t total_pages = 0;
|
||||
for (size_t i = 0; i < count; ++i) {
|
||||
MDBX_txn *txn = subtxns[i];
|
||||
if (txn && txn->tw.subtxn_repnl)
|
||||
total_pages += MDBX_PNL_GETSIZE(txn->tw.subtxn_repnl);
|
||||
}
|
||||
|
||||
if (total_pages == 0)
|
||||
/* Prefault this subtxn's arena pages using io_uring.
|
||||
* Called by each subtxn thread at the start of its work.
|
||||
* Each thread has its own io_uring ring - true parallel I/O. */
|
||||
static void subtxn_prefault_own_arena(MDBX_txn *subtxn) {
|
||||
if (!(subtxn->flags & txn_parallel_subtx))
|
||||
return;
|
||||
if (!subtxn->tw.subtxn_repnl)
|
||||
return;
|
||||
|
||||
DEBUG("io_uring batch prefault: %zu pages across %zu subtxns", total_pages, count);
|
||||
MDBX_env *env = subtxn->env;
|
||||
if (!subtxn->tw.prefault_write_activated)
|
||||
return;
|
||||
|
||||
const size_t npages = MDBX_PNL_GETSIZE(subtxn->tw.subtxn_repnl);
|
||||
if (npages == 0)
|
||||
return;
|
||||
|
||||
#if !MDBX_HAVE_IO_URING
|
||||
return; /* Silently skip on non-Linux */
|
||||
#else
|
||||
DEBUG("io_uring per-subtxn prefault: %zu pages for subtxn %p", npages, (void *)subtxn);
|
||||
|
||||
/* Get dirty pattern buffer (offset by ps*2 from page_auxbuf) */
|
||||
void *const pattern = ptr_disp(env->page_auxbuf, env->ps * 2);
|
||||
|
||||
/* Cap ring size - kernel has limits, we'll submit in batches if needed */
|
||||
const size_t ring_size = total_pages < 32768 ? total_pages : 32768;
|
||||
|
||||
const size_t ring_size = npages < 32768 ? npages : 32768;
|
||||
struct io_uring ring;
|
||||
int ret = io_uring_queue_init(ring_size, &ring, 0);
|
||||
if (ret < 0) {
|
||||
FATAL("io_uring_queue_init(%zu) failed: %s (total_pages=%zu)",
|
||||
ring_size, strerror(-ret), total_pages);
|
||||
abort();
|
||||
}
|
||||
if (ret < 0)
|
||||
return; /* Soft fail - prefetch is optimization */
|
||||
|
||||
/* Submit write operations for all scattered pages */
|
||||
for (size_t i = 0; i < count; ++i) {
|
||||
MDBX_txn *txn = subtxns[i];
|
||||
if (!txn || !txn->tw.subtxn_repnl)
|
||||
continue;
|
||||
|
||||
const size_t npages = MDBX_PNL_GETSIZE(txn->tw.subtxn_repnl);
|
||||
for (size_t j = 0; j < npages; ++j) {
|
||||
pgno_t pgno = txn->tw.subtxn_repnl[j + 1]; /* PNL is 1-indexed */
|
||||
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
|
||||
if (unlikely(!sqe)) {
|
||||
/* SQ full, submit and continue */
|
||||
io_uring_submit(&ring);
|
||||
sqe = io_uring_get_sqe(&ring);
|
||||
if (!sqe)
|
||||
continue; /* Skip this page on failure */
|
||||
}
|
||||
io_uring_prep_write(sqe, env->lazy_fd, pattern, env->ps,
|
||||
pgno2bytes(env, pgno));
|
||||
for (size_t i = 0; i < npages; ++i) {
|
||||
pgno_t pgno = subtxn->tw.subtxn_repnl[i + 1];
|
||||
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
|
||||
if (unlikely(!sqe)) {
|
||||
io_uring_submit(&ring);
|
||||
sqe = io_uring_get_sqe(&ring);
|
||||
if (!sqe)
|
||||
continue;
|
||||
}
|
||||
io_uring_prep_write(sqe, env->lazy_fd, pattern, env->ps,
|
||||
pgno2bytes(env, pgno));
|
||||
}
|
||||
|
||||
/* Submit all pending operations */
|
||||
io_uring_submit(&ring);
|
||||
|
||||
/* Wait for all completions */
|
||||
struct io_uring_cqe *cqe;
|
||||
for (size_t i = 0; i < total_pages; ++i) {
|
||||
for (size_t i = 0; i < npages; ++i) {
|
||||
if (io_uring_wait_cqe(&ring, &cqe) < 0)
|
||||
break;
|
||||
io_uring_cqe_seen(&ring, cqe);
|
||||
}
|
||||
|
||||
io_uring_queue_exit(&ring);
|
||||
|
||||
for (size_t i = 0; i < count; ++i) {
|
||||
if (subtxns[i])
|
||||
subtxns[i]->tw.subtxn_arena_prefaulted = true;
|
||||
}
|
||||
subtxn->tw.subtxn_arena_prefaulted = true;
|
||||
|
||||
#if MDBX_ENABLE_PGOP_STAT
|
||||
env->lck->pgops.prefault.weak += total_pages;
|
||||
#endif /* MDBX_ENABLE_PGOP_STAT */
|
||||
env->lck->pgops.prefault.weak += npages;
|
||||
#endif
|
||||
#endif /* MDBX_HAVE_IO_URING */
|
||||
}
|
||||
|
||||
/* Public API for Rust to trigger per-subtxn arena prefault.
|
||||
* Call this at the start of each subtxn thread's work. */
|
||||
LIBMDBX_API int mdbx_subtxn_prefault_arena(MDBX_txn *txn) {
|
||||
if (unlikely(!txn))
|
||||
return MDBX_EINVAL;
|
||||
if (unlikely(txn->signature != txn_signature))
|
||||
return MDBX_EBADSIGN;
|
||||
if (!(txn->flags & txn_parallel_subtx))
|
||||
return MDBX_EINVAL;
|
||||
if (txn->tw.subtxn_arena_prefaulted)
|
||||
return MDBX_SUCCESS; /* Already done */
|
||||
|
||||
subtxn_prefault_own_arena(txn);
|
||||
return MDBX_SUCCESS;
|
||||
}
|
||||
|
||||
/* mdbx_txn_create_subtxns - Create parallel sibling subtransactions.
|
||||
*
|
||||
* INVARIANTS:
|
||||
@@ -16282,11 +16277,9 @@ LIBMDBX_API int mdbx_txn_create_subtxns(MDBX_txn *parent,
|
||||
|
||||
osal_free(gc_alloc);
|
||||
|
||||
/* Batch prefault all arena pages before subtxns start writing.
|
||||
* This front-loads page residency, eliminating per-page mincore checks
|
||||
* during gc_alloc_single. Only prefault if activated for this txn. */
|
||||
if (parent->tw.prefault_write_activated)
|
||||
subtxn_batch_prefault_arena(parent->env, subtxns, count);
|
||||
/* Arena prefault is now per-subtxn via mdbx_subtxn_prefault_arena().
|
||||
* Rust calls this at the start of each subtxn thread's work,
|
||||
* enabling true parallel I/O with per-thread io_uring rings. */
|
||||
|
||||
return MDBX_SUCCESS;
|
||||
}
|
||||
|
||||
@@ -6771,6 +6771,16 @@ typedef struct MDBX_subtxn_stats {
|
||||
*/
|
||||
LIBMDBX_API int mdbx_subtxn_get_stats(const MDBX_txn *subtxn, MDBX_subtxn_stats *stats);
|
||||
|
||||
/** \brief Prefault this subtxn's arena pages using io_uring.
|
||||
*
|
||||
* Call this at the start of subtxn work to overlap I/O with sibling subtxns.
|
||||
* Each subtxn thread should call this - they prefault in parallel.
|
||||
*
|
||||
* \param [in] txn A valid subtransaction handle.
|
||||
* \returns A non-zero error value on failure and 0 on success.
|
||||
*/
|
||||
LIBMDBX_API int mdbx_subtxn_prefault_arena(MDBX_txn *txn);
|
||||
|
||||
/** end of c_parallel @} */
|
||||
|
||||
/** end of c_api @} */
|
||||
|
||||
@@ -162,6 +162,17 @@ impl SubTransaction {
|
||||
})
|
||||
})?
|
||||
}
|
||||
|
||||
/// Prefault this subtxn's arena pages using `io_uring`.
|
||||
///
|
||||
/// Call this at the start of subtxn work to overlap I/O with sibling subtxns.
|
||||
/// Each subtxn thread should call this - they prefault in parallel.
|
||||
pub fn prefault_arena(&self) -> Result<()> {
|
||||
self.txn_ptr.txn_execute_fail_on_timeout(|ptr| {
|
||||
mdbx_result(unsafe { ffi::mdbx_subtxn_prefault_arena(ptr) })
|
||||
})??;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<K> Transaction<K>
|
||||
@@ -877,6 +888,27 @@ impl Transaction<RW> {
|
||||
self.inner.txn.clone()
|
||||
}
|
||||
|
||||
/// Prefaults the arena for the subtransaction bound to the given DBI using `io_uring`.
|
||||
///
|
||||
/// Call this at the start of subtxn work to overlap I/O with sibling subtxns.
|
||||
/// Each subtxn thread should call this - they prefault in parallel.
|
||||
///
|
||||
/// Returns `Ok(true)` if prefault was performed, `Ok(false)` if no subtxn exists for
|
||||
/// this DBI (e.g., parallel writes not enabled).
|
||||
pub fn prefault_arena_for_dbi(&self, dbi: ffi::MDBX_dbi) -> Result<bool> {
|
||||
if !self.parallel_writes_enabled.load(std::sync::atomic::Ordering::SeqCst) {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let subtxns = self.subtxns.read();
|
||||
if let Some(subtxn) = subtxns.get(&dbi) {
|
||||
subtxn.prefault_arena()?;
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
/// Aborts all subtransactions.
|
||||
///
|
||||
/// This discards all changes made through subtransactions.
|
||||
|
||||
@@ -652,6 +652,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
) -> ProviderResult<Duration> {
|
||||
let start = Instant::now();
|
||||
|
||||
self.tx_ref().prefault_arena_for_table::<tables::PlainAccountState>()?;
|
||||
let mut cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
|
||||
for (address, account) in accounts {
|
||||
if let Some(account) = account {
|
||||
@@ -671,6 +672,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
pub fn write_bytecodes_only(&self, contracts: &[(B256, Bytecode)]) -> ProviderResult<Duration> {
|
||||
let start = Instant::now();
|
||||
|
||||
self.tx_ref().prefault_arena_for_table::<tables::Bytecodes>()?;
|
||||
let mut cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
|
||||
for (hash, bytecode) in contracts {
|
||||
cursor.upsert(*hash, bytecode)?;
|
||||
@@ -689,6 +691,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
) -> ProviderResult<Duration> {
|
||||
let start = Instant::now();
|
||||
|
||||
self.tx_ref().prefault_arena_for_table::<tables::PlainStorageState>()?;
|
||||
let mut cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
|
||||
for PreparedStorageWrite { address, wipe_storage, storage } in storage {
|
||||
// Wipe storage if flagged
|
||||
@@ -723,6 +726,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
) -> ProviderResult<Duration> {
|
||||
let start = Instant::now();
|
||||
|
||||
self.tx_ref().prefault_arena_for_table::<tables::HashedAccounts>()?;
|
||||
let mut cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
|
||||
for (hashed_address, account) in accounts {
|
||||
if let Some(account) = account {
|
||||
@@ -746,6 +750,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
) -> ProviderResult<Duration> {
|
||||
let start = Instant::now();
|
||||
|
||||
self.tx_ref().prefault_arena_for_table::<tables::HashedStorages>()?;
|
||||
let mut cursor = self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
|
||||
for (hashed_address, storage) in storages {
|
||||
// Wipe storage if flagged
|
||||
@@ -783,6 +788,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
let start = Instant::now();
|
||||
let mut num_entries = 0;
|
||||
|
||||
self.tx_ref().prefault_arena_for_table::<tables::AccountsTrie>()?;
|
||||
let mut cursor = self.tx_ref().cursor_write::<tables::AccountsTrie>()?;
|
||||
for (key, updated_node) in nodes {
|
||||
let nibbles = StoredNibbles(*key);
|
||||
@@ -818,6 +824,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
let mut num_entries = 0;
|
||||
let mut total_op_counts = StorageTrieOpCounts::default();
|
||||
|
||||
self.tx_ref().prefault_arena_for_table::<tables::StoragesTrie>()?;
|
||||
let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
|
||||
for (hashed_address, storage_trie_updates) in tries {
|
||||
let mut db_storage_trie_cursor =
|
||||
|
||||
Reference in New Issue
Block a user