perf(libmdbx): pool read-only transaction handles to avoid reader table mutex (#22631)

Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
This commit is contained in:
Derek Cofausper
2026-02-27 18:58:17 -08:00
committed by GitHub
parent 614a68532b
commit 7bb5c579e0
7 changed files with 497 additions and 52 deletions

106
Cargo.lock generated
View File

@@ -1038,7 +1038,7 @@ version = "1.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc"
dependencies = [
"windows-sys 0.60.2",
"windows-sys 0.61.2",
]
[[package]]
@@ -1049,7 +1049,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d"
dependencies = [
"anstyle",
"once_cell_polyfill",
"windows-sys 0.60.2",
"windows-sys 0.61.2",
]
[[package]]
@@ -2408,7 +2408,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c"
dependencies = [
"lazy_static",
"windows-sys 0.48.0",
"windows-sys 0.59.0",
]
[[package]]
@@ -2657,6 +2657,15 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.21"
@@ -2756,7 +2765,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0b1fab2ae45819af2d0731d60f2afe17227ebb1a1538a236da84c93e9a60162"
dependencies = [
"dispatch2",
"nix 0.31.1",
"nix 0.31.2",
"windows-sys 0.61.2",
]
@@ -2944,7 +2953,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ab67060fc6b8ef687992d439ca0fa36e7ed17e9a0b16b25b601e8757df720de"
dependencies = [
"data-encoding",
"syn 1.0.109",
"syn 2.0.117",
]
[[package]]
@@ -3434,7 +3443,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb"
dependencies = [
"libc",
"windows-sys 0.52.0",
"windows-sys 0.61.2",
]
[[package]]
@@ -4248,7 +4257,7 @@ dependencies = [
"libc",
"log",
"rustversion",
"windows-link 0.1.3",
"windows-link 0.2.1",
"windows-result 0.4.1",
]
@@ -5134,7 +5143,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46"
dependencies = [
"hermit-abi",
"libc",
"windows-sys 0.52.0",
"windows-sys 0.61.2",
]
[[package]]
@@ -5227,9 +5236,9 @@ dependencies = [
[[package]]
name = "js-sys"
version = "0.3.90"
version = "0.3.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14dc6f6450b3f6d4ed5b16327f38fed626d375a886159ca555bd7822c0c3a5a6"
checksum = "b49715b7073f385ba4bc528e5747d02e66cb39c6146efb66b781f131f0fb399c"
dependencies = [
"once_cell",
"wasm-bindgen",
@@ -5501,9 +5510,9 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]]
name = "libc"
version = "0.2.180"
version = "0.2.182"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bcc35a38544a891a5f7c865aca548a982ccb3b8650a5b06d0fd33a10283c56fc"
checksum = "6800badb6cb2082ffd7b6a67e6125bb39f18782f793520caee8cb8846be06112"
[[package]]
name = "libgit2-sys"
@@ -5571,7 +5580,7 @@ checksum = "3d0b95e02c851351f877147b7deea7b1afb1df71b63aa5f8270716e0c5720616"
dependencies = [
"bitflags 2.11.0",
"libc",
"redox_syscall 0.7.2",
"redox_syscall 0.7.3",
]
[[package]]
@@ -5629,9 +5638,9 @@ dependencies = [
[[package]]
name = "linux-raw-sys"
version = "0.11.0"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039"
checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53"
[[package]]
name = "litemap"
@@ -6038,9 +6047,9 @@ dependencies = [
[[package]]
name = "nix"
version = "0.31.1"
version = "0.31.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "225e7cfe711e0ba79a68baeddb2982723e4235247aefce1482f2f16c27865b66"
checksum = "5d6d0705320c1e6ba1d912b5e37cf18071b6c2e9b7fa8215a1e8a7651966f5d3"
dependencies = [
"bitflags 2.11.0",
"cfg-if",
@@ -6100,7 +6109,7 @@ version = "0.50.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5"
dependencies = [
"windows-sys 0.60.2",
"windows-sys 0.61.2",
]
[[package]]
@@ -6718,18 +6727,18 @@ dependencies = [
[[package]]
name = "pin-project"
version = "1.1.10"
version = "1.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a"
checksum = "f1749c7ed4bcaf4c3d0a3efc28538844fb29bcdd7d2b67b2be7e20ba861ff517"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.1.10"
version = "1.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861"
checksum = "d9b20ed30f105399776b9c883e68e536ef602a16ae6f596d2c473591d6ad64c6"
dependencies = [
"proc-macro2",
"quote",
@@ -6738,9 +6747,9 @@ dependencies = [
[[package]]
name = "pin-project-lite"
version = "0.2.16"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd"
[[package]]
name = "pin-utils"
@@ -7370,9 +7379,9 @@ dependencies = [
[[package]]
name = "redox_syscall"
version = "0.7.2"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d94dd2f7cd932d4dc02cc8b2b50dfd38bd079a4e5d79198b99743d7fcf9a4b4"
checksum = "6ce70a74e890531977d37e532c34d45e9055d2409ed08ddba14529471ed0be16"
dependencies = [
"bitflags 2.11.0",
]
@@ -7634,7 +7643,7 @@ dependencies = [
"csv",
"ctrlc",
"eyre",
"nix 0.31.1",
"nix 0.31.2",
"reth-chainspec",
"reth-cli-runner",
"reth-cli-util",
@@ -8938,6 +8947,7 @@ version = "1.11.1"
dependencies = [
"bitflags 2.11.0",
"byteorder",
"crossbeam-queue",
"dashmap",
"derive_more",
"parking_lot",
@@ -11015,15 +11025,15 @@ dependencies = [
[[package]]
name = "rustix"
version = "1.1.3"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "146c9e247ccc180c1f61615433868c99f3de3ae256a30a43b49f67c2d9171f34"
checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190"
dependencies = [
"bitflags 2.11.0",
"errno",
"libc",
"linux-raw-sys",
"windows-sys 0.52.0",
"windows-sys 0.61.2",
]
[[package]]
@@ -11081,7 +11091,7 @@ dependencies = [
"security-framework",
"security-framework-sys",
"webpki-root-certs 0.26.11",
"windows-sys 0.52.0",
"windows-sys 0.59.0",
]
[[package]]
@@ -11855,15 +11865,15 @@ dependencies = [
[[package]]
name = "tempfile"
version = "3.25.0"
version = "3.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0136791f7c95b1f6dd99f9cc786b91bb81c3800b639b3478e561ddb7be95e5f1"
checksum = "82a72c767771b47409d2345987fda8628641887d5466101319899796367354a0"
dependencies = [
"fastrand 2.3.0",
"getrandom 0.4.1",
"once_cell",
"rustix",
"windows-sys 0.52.0",
"windows-sys 0.61.2",
]
[[package]]
@@ -12575,7 +12585,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5f7c95348f20c1c913d72157b3c6dee6ea3e30b3d19502c5a7f6d3f160dacbf"
dependencies = [
"cc",
"windows-targets 0.48.5",
"windows-targets 0.52.6",
]
[[package]]
@@ -12962,9 +12972,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen"
version = "0.2.113"
version = "0.2.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60722a937f594b7fde9adb894d7c092fc1bb6612897c46368d18e7a20208eff2"
checksum = "6532f9a5c1ece3798cb1c2cfdba640b9b3ba884f5db45973a6f442510a87d38e"
dependencies = [
"cfg-if",
"once_cell",
@@ -12975,9 +12985,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.63"
version = "0.4.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a89f4650b770e4521aa6573724e2aed4704372151bd0de9d16a3bbabb87441a"
checksum = "e9c5522b3a28661442748e09d40924dfb9ca614b21c00d3fd135720e48b67db8"
dependencies = [
"cfg-if",
"futures-util",
@@ -12989,9 +12999,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.113"
version = "0.2.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fac8c6395094b6b91c4af293f4c79371c163f9a6f56184d2c9a85f5a95f3950"
checksum = "18a2d50fcf105fb33bb15f00e7a77b772945a2ee45dcf454961fd843e74c18e6"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
@@ -12999,9 +13009,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.113"
version = "0.2.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab3fabce6159dc20728033842636887e4877688ae94382766e00b180abac9d60"
checksum = "03ce4caeaac547cdf713d280eda22a730824dd11e6b8c3ca9e42247b25c631e3"
dependencies = [
"bumpalo",
"proc-macro2",
@@ -13012,9 +13022,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.113"
version = "0.2.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de0e091bdb824da87dc01d967388880d017a0a9bc4f3bdc0d86ee9f9336e3bb5"
checksum = "75a326b8c223ee17883a4251907455a2431acc2791c98c26279376490c378c16"
dependencies = [
"unicode-ident",
]
@@ -13082,9 +13092,9 @@ dependencies = [
[[package]]
name = "web-sys"
version = "0.3.90"
version = "0.3.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "705eceb4ce901230f8625bd1d665128056ccbe4b7408faa625eec1ba80f59a97"
checksum = "854ba17bb104abfb26ba36da9729addc7ce7f06f5c0f90f3c391f8461cca21f9"
dependencies = [
"js-sys",
"wasm-bindgen",
@@ -13164,7 +13174,7 @@ version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
dependencies = [
"windows-sys 0.48.0",
"windows-sys 0.61.2",
]
[[package]]

View File

@@ -660,6 +660,7 @@ cipher = "0.4.3"
comfy-table = "7.0"
concat-kdf = "0.1.0"
crossbeam-channel = "0.5.13"
crossbeam-queue = "0.3"
crossbeam-utils = "0.8"
crossterm = "0.29.0"
csv = "1.3.0"

View File

@@ -17,6 +17,7 @@ reth-mdbx-sys.workspace = true
bitflags.workspace = true
byteorder.workspace = true
derive_more.workspace = true
crossbeam-queue.workspace = true
parking_lot.workspace = true
smallvec.workspace = true
thiserror.workspace = true

View File

@@ -4,6 +4,7 @@ use crate::{
flags::EnvironmentFlags,
transaction::{RO, RW},
txn_manager::{TxnManager, TxnManagerMessage, TxnPtr},
txn_pool::ReadTxnPool,
Mode, SyncMode, Transaction, TransactionKind,
};
use byteorder::{ByteOrder, NativeEndian};
@@ -95,11 +96,23 @@ impl Environment {
}
/// Create a read-only transaction for use with the environment.
///
/// Reuses a previously-reset transaction handle from the internal pool when available,
/// avoiding the `lck_rdt_lock` mutex in MDBX's `mvcc_bind_slot` on each new transaction.
#[inline]
pub fn begin_ro_txn(&self) -> Result<Transaction<RO>> {
if let Some(txn_ptr) = self.inner.ro_txn_pool.pop() {
return Ok(Transaction::new_from_ptr(self.clone(), txn_ptr));
}
Transaction::new(self.clone())
}
/// Returns the read transaction pool.
#[inline]
pub(crate) fn ro_txn_pool(&self) -> &ReadTxnPool {
&self.inner.ro_txn_pool
}
/// Create a read-write transaction for use with the environment. This method will block while
/// there are any other read-write transactions open on the environment.
pub fn begin_rw_txn(&self) -> Result<Transaction<RW>> {
@@ -239,10 +252,15 @@ struct EnvironmentInner {
env_kind: EnvironmentKind,
/// Transaction manager
txn_manager: TxnManager,
/// Pool of reset read-only transaction handles for reuse.
ro_txn_pool: ReadTxnPool,
}
impl Drop for EnvironmentInner {
fn drop(&mut self) {
// Abort all pooled read transactions before closing the environment.
self.ro_txn_pool.drain();
// Close open mdbx environment on drop
unsafe {
ffi::mdbx_env_close_ex(self.env, false);
@@ -750,7 +768,12 @@ impl EnvironmentBuilder {
}
};
let env = EnvironmentInner { env, txn_manager, env_kind: self.kind };
let env = EnvironmentInner {
env,
txn_manager,
env_kind: self.kind,
ro_txn_pool: ReadTxnPool::new(),
};
Ok(Environment { inner: Arc::new(env) })
}

View File

@@ -35,6 +35,7 @@ mod error;
mod flags;
mod transaction;
mod txn_manager;
mod txn_pool;
#[cfg(test)]
mod test_utils {

View File

@@ -350,9 +350,10 @@ where
#[cfg(feature = "read-tx-timeouts")]
self.env.txn_manager().remove_active_read_transaction(txn);
unsafe {
ffi::mdbx_txn_abort(txn);
}
// Reset and return the handle to the pool for lock-free reuse.
// pool.put() calls mdbx_txn_reset internally and falls back to
// mdbx_txn_abort if the reset fails or the pool is full.
self.env.ro_txn_pool().push(txn);
} else {
let (sender, rx) = sync_channel(0);
self.env

View File

@@ -0,0 +1,408 @@
use crate::error::mdbx_result;
use crossbeam_queue::ArrayQueue;
/// Lock-free pool of reset read-only MDBX transaction handles.
///
/// With `MDBX_NOTLS` (which reth always sets), every `mdbx_txn_begin_ex` for a read transaction
/// calls `mvcc_bind_slot`, which acquires `lck_rdt_lock` — a pthread mutex. Under high
/// concurrency (e.g., prewarming), this becomes a contention point.
///
/// This pool caches transaction handles that have been reset via `mdbx_txn_reset`. A reset handle
/// retains its reader slot, so `mdbx_txn_renew` can reactivate it without touching the reader
/// table mutex.
pub(crate) struct ReadTxnPool {
queue: ArrayQueue<PooledTxn>,
}
/// Wrapper around a raw txn pointer to satisfy `Send + Sync` for the queue.
struct PooledTxn(*mut ffi::MDBX_txn);
// SAFETY: MDBX txn pointers are safe to send across threads — we ensure exclusive
// ownership via the queue's push/pop semantics.
unsafe impl Send for PooledTxn {}
unsafe impl Sync for PooledTxn {}
impl ReadTxnPool {
pub(crate) fn new() -> Self {
Self { queue: ArrayQueue::new(256) }
}
/// Takes a reset transaction handle from the pool, renews it, and returns it ready for use.
///
/// Returns `None` if the pool is empty or all renew attempts fail.
pub(crate) fn pop(&self) -> Option<*mut ffi::MDBX_txn> {
while let Some(handle) = self.queue.pop() {
let txn = handle.0;
// SAFETY: this pointer was previously created by mdbx_txn_begin_ex and reset
// via mdbx_txn_reset. mdbx_txn_renew reuses the existing reader slot without
// taking lck_rdt_lock.
match mdbx_result(unsafe { ffi::mdbx_txn_renew(txn) }) {
Ok(_) => return Some(txn),
Err(e) => {
tracing::warn!(target: "libmdbx", %e, "failed to renew pooled read transaction");
abort_txn(txn);
}
}
}
None
}
/// Resets an active read transaction handle and returns it to the pool.
///
/// If reset fails, the handle is aborted instead.
pub(crate) fn push(&self, txn: *mut ffi::MDBX_txn) {
// mdbx_txn_reset releases the MVCC snapshot but keeps the reader slot.
if let Err(e) = mdbx_result(unsafe { ffi::mdbx_txn_reset(txn) }) {
tracing::warn!(target: "libmdbx", %e, "failed to reset read transaction for pooling");
abort_txn(txn);
return;
}
if self.queue.push(PooledTxn(txn)).is_err() {
abort_txn(txn);
}
}
/// Aborts all pooled transaction handles. Called during environment shutdown.
pub(crate) fn drain(&self) {
while let Some(handle) = self.queue.pop() {
abort_txn(handle.0);
}
}
}
/// Aborts a transaction handle, logging any error.
fn abort_txn(txn: *mut ffi::MDBX_txn) {
if let Err(e) = mdbx_result(unsafe { ffi::mdbx_txn_abort(txn) }) {
tracing::error!(target: "libmdbx", %e, "failed to abort transaction");
}
}
impl Drop for ReadTxnPool {
fn drop(&mut self) {
self.drain();
}
}
#[cfg(test)]
mod tests {
use crate::{Environment, WriteFlags};
/// Opens a fresh test environment.
fn test_env() -> (tempfile::TempDir, Environment) {
let dir = tempfile::tempdir().unwrap();
let env = Environment::builder().open(dir.path()).unwrap();
(dir, env)
}
/// Inserts a single key so the database isn't empty.
fn seed(env: &Environment) {
let tx = env.begin_rw_txn().unwrap();
let db = tx.open_db(None).unwrap();
tx.put(db.dbi(), b"key", b"val", WriteFlags::empty()).unwrap();
tx.commit().unwrap();
}
#[test]
fn get_returns_none_when_empty() {
let (_dir, env) = test_env();
assert!(env.ro_txn_pool().pop().is_none());
}
#[test]
fn put_get_roundtrip() {
let (_dir, env) = test_env();
seed(&env);
// Open and drop a read txn — drop returns the handle to the pool.
let txn = env.begin_ro_txn().unwrap();
drop(txn);
// Next begin_ro_txn should reuse the pooled handle.
let txn = env.begin_ro_txn().unwrap();
let _id = txn.id().unwrap();
}
#[test]
fn pooled_txn_reads_latest_snapshot() {
let (_dir, env) = test_env();
seed(&env);
// Open a read txn and drop it to pool the handle.
let txn = env.begin_ro_txn().unwrap();
drop(txn);
// Write new data.
{
let tx = env.begin_rw_txn().unwrap();
let db = tx.open_db(None).unwrap();
tx.put(db.dbi(), b"key2", b"val2", WriteFlags::empty()).unwrap();
tx.commit().unwrap();
}
// The renewed pooled txn must see the new data.
let txn = env.begin_ro_txn().unwrap();
let db = txn.open_db(None).unwrap();
let val: Option<[u8; 4]> = txn.get(db.dbi(), b"key2").unwrap();
assert_eq!(val, Some(*b"val2"));
}
#[test]
fn multiple_put_get_cycles() {
let (_dir, env) = test_env();
seed(&env);
for _ in 0..50 {
let txn = env.begin_ro_txn().unwrap();
let db = txn.open_db(None).unwrap();
let val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap();
assert_eq!(val, Some(*b"val"));
drop(txn);
}
}
#[test]
fn concurrent_txns_pool_multiple_handles() {
let (_dir, env) = test_env();
seed(&env);
// Open several txns concurrently — each gets a fresh handle.
let txns: Vec<_> = (0..8).map(|_| env.begin_ro_txn().unwrap()).collect();
// Drop them all — pool should accumulate handles.
let count = txns.len();
drop(txns);
// Reopen same number — all should come from the pool.
let txns: Vec<_> = (0..count).map(|_| env.begin_ro_txn().unwrap()).collect();
for txn in &txns {
let db = txn.open_db(None).unwrap();
let val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap();
assert_eq!(val, Some(*b"val"));
}
}
#[test]
fn drain_empties_pool() {
let (_dir, env) = test_env();
seed(&env);
// Pool a handle.
let txn = env.begin_ro_txn().unwrap();
drop(txn);
env.ro_txn_pool().drain();
// Pool is empty — get should return None.
assert!(env.ro_txn_pool().pop().is_none());
}
#[test]
fn committed_txn_is_not_pooled() {
let (_dir, env) = test_env();
seed(&env);
let txn = env.begin_ro_txn().unwrap();
txn.commit().unwrap();
// Committed txns are freed by mdbx, not returned to pool.
assert!(env.ro_txn_pool().pop().is_none());
}
#[test]
fn multithreaded_pool_usage() {
let (_dir, env) = test_env();
seed(&env);
let env = std::sync::Arc::new(env);
let barrier = std::sync::Arc::new(std::sync::Barrier::new(8));
let handles: Vec<_> = (0..8)
.map(|_| {
let env = env.clone();
let barrier = barrier.clone();
std::thread::spawn(move || {
barrier.wait();
for _ in 0..100 {
let txn = env.begin_ro_txn().unwrap();
let db = txn.open_db(None).unwrap();
let val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap();
assert_eq!(val, Some(*b"val"));
drop(txn);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
}
#[test]
fn multithreaded_mixed_read_write() {
let (_dir, env) = test_env();
seed(&env);
let env = std::sync::Arc::new(env);
let barrier = std::sync::Arc::new(std::sync::Barrier::new(5));
// Spawn reader threads.
let mut handles: Vec<_> = (0..4)
.map(|_| {
let env = env.clone();
let barrier = barrier.clone();
std::thread::spawn(move || {
barrier.wait();
for _ in 0..50 {
let txn = env.begin_ro_txn().unwrap();
let db = txn.open_db(None).unwrap();
// key may or may not exist depending on writer timing.
let _val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap();
drop(txn);
}
})
})
.collect();
// Spawn a writer thread.
handles.push(std::thread::spawn(move || {
barrier.wait();
for i in 0u32..20 {
let tx = env.begin_rw_txn().unwrap();
let db = tx.open_db(None).unwrap();
tx.put(db.dbi(), i.to_le_bytes(), b"v", WriteFlags::empty()).unwrap();
tx.commit().unwrap();
}
}));
for h in handles {
h.join().unwrap();
}
}
#[test]
fn multithreaded_concurrent_open_close() {
let (_dir, env) = test_env();
seed(&env);
let env = std::sync::Arc::new(env);
let barrier = std::sync::Arc::new(std::sync::Barrier::new(16));
// 16 threads each open and close 200 txns — exercises pool contention.
let handles: Vec<_> = (0..16)
.map(|_| {
let env = env.clone();
let barrier = barrier.clone();
std::thread::spawn(move || {
barrier.wait();
for _ in 0..200 {
let txn = env.begin_ro_txn().unwrap();
let db = txn.open_db(None).unwrap();
let val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap();
assert_eq!(val, Some(*b"val"));
// Intentionally don't call drop explicitly — let scope handle it.
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
}
#[test]
fn multithreaded_hold_multiple_txns() {
let (_dir, env) = test_env();
seed(&env);
let env = std::sync::Arc::new(env);
let barrier = std::sync::Arc::new(std::sync::Barrier::new(8));
// Each thread holds multiple txns open simultaneously, then drops them all.
let handles: Vec<_> = (0..8)
.map(|_| {
let env = env.clone();
let barrier = barrier.clone();
std::thread::spawn(move || {
barrier.wait();
for _ in 0..20 {
let txns: Vec<_> = (0..4).map(|_| env.begin_ro_txn().unwrap()).collect();
for txn in &txns {
let db = txn.open_db(None).unwrap();
let val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap();
assert_eq!(val, Some(*b"val"));
}
drop(txns);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
}
#[test]
fn multithreaded_drain_under_contention() {
let (_dir, env) = test_env();
seed(&env);
let env = std::sync::Arc::new(env);
// Fill the pool with handles.
{
let txns: Vec<_> = (0..16).map(|_| env.begin_ro_txn().unwrap()).collect();
drop(txns);
}
let barrier = std::sync::Arc::new(std::sync::Barrier::new(5));
// 4 threads racing to get from pool + 1 thread draining.
let mut handles: Vec<_> = (0..4)
.map(|_| {
let env = env.clone();
let barrier = barrier.clone();
std::thread::spawn(move || {
barrier.wait();
for _ in 0..50 {
let _txn = env.begin_ro_txn(); // may or may not get a pooled handle
}
})
})
.collect();
handles.push(std::thread::spawn(move || {
barrier.wait();
env.ro_txn_pool().drain();
}));
for h in handles {
h.join().unwrap();
}
}
#[test]
fn pool_overflow_aborts_excess() {
let dir = tempfile::tempdir().unwrap();
let env = Environment::builder().set_max_readers(512).open(dir.path()).unwrap();
seed(&env);
// Open more txns than the pool capacity (256), drop them all.
let txns: Vec<_> = (0..300).map(|_| env.begin_ro_txn().unwrap()).collect();
drop(txns);
// Pool is capped at 256; excess handles are aborted.
assert_eq!(env.ro_txn_pool().queue.len(), 256);
// All 256 pooled handles should still work.
let txns: Vec<_> = (0..256).map(|_| env.begin_ro_txn().unwrap()).collect();
for txn in &txns {
let db = txn.open_db(None).unwrap();
let val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap();
assert_eq!(val, Some(*b"val"));
}
}
}