diff --git a/Cargo.lock b/Cargo.lock index 7798f06970..20868b7014 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1038,7 +1038,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -1049,7 +1049,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -2408,7 +2408,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c" dependencies = [ "lazy_static", - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] @@ -2657,6 +2657,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -2756,7 +2765,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e0b1fab2ae45819af2d0731d60f2afe17227ebb1a1538a236da84c93e9a60162" dependencies = [ "dispatch2", - "nix 0.31.1", + "nix 0.31.2", "windows-sys 0.61.2", ] @@ -2944,7 +2953,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ab67060fc6b8ef687992d439ca0fa36e7ed17e9a0b16b25b601e8757df720de" dependencies = [ "data-encoding", - "syn 1.0.109", + "syn 2.0.117", ] [[package]] @@ -3434,7 +3443,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -4248,7 +4257,7 @@ dependencies = [ "libc", "log", "rustversion", - "windows-link 0.1.3", + "windows-link 0.2.1", "windows-result 0.4.1", ] @@ -5134,7 +5143,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -5227,9 +5236,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.90" +version = "0.3.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14dc6f6450b3f6d4ed5b16327f38fed626d375a886159ca555bd7822c0c3a5a6" +checksum = "b49715b7073f385ba4bc528e5747d02e66cb39c6146efb66b781f131f0fb399c" dependencies = [ "once_cell", "wasm-bindgen", @@ -5501,9 +5510,9 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "libc" -version = "0.2.180" +version = "0.2.182" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcc35a38544a891a5f7c865aca548a982ccb3b8650a5b06d0fd33a10283c56fc" +checksum = "6800badb6cb2082ffd7b6a67e6125bb39f18782f793520caee8cb8846be06112" [[package]] name = "libgit2-sys" @@ -5571,7 +5580,7 @@ checksum = "3d0b95e02c851351f877147b7deea7b1afb1df71b63aa5f8270716e0c5720616" dependencies = [ "bitflags 2.11.0", "libc", - "redox_syscall 0.7.2", + "redox_syscall 0.7.3", ] [[package]] @@ -5629,9 +5638,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.11.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" +checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" [[package]] name = "litemap" @@ -6038,9 +6047,9 @@ dependencies = [ [[package]] name = "nix" -version = "0.31.1" +version = "0.31.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "225e7cfe711e0ba79a68baeddb2982723e4235247aefce1482f2f16c27865b66" +checksum = "5d6d0705320c1e6ba1d912b5e37cf18071b6c2e9b7fa8215a1e8a7651966f5d3" dependencies = [ "bitflags 2.11.0", "cfg-if", @@ -6100,7 +6109,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -6718,18 +6727,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.10" +version = "1.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +checksum = "f1749c7ed4bcaf4c3d0a3efc28538844fb29bcdd7d2b67b2be7e20ba861ff517" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.10" +version = "1.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +checksum = "d9b20ed30f105399776b9c883e68e536ef602a16ae6f596d2c473591d6ad64c6" dependencies = [ "proc-macro2", "quote", @@ -6738,9 +6747,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" [[package]] name = "pin-utils" @@ -7370,9 +7379,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d94dd2f7cd932d4dc02cc8b2b50dfd38bd079a4e5d79198b99743d7fcf9a4b4" +checksum = "6ce70a74e890531977d37e532c34d45e9055d2409ed08ddba14529471ed0be16" dependencies = [ "bitflags 2.11.0", ] @@ -7634,7 +7643,7 @@ dependencies = [ "csv", "ctrlc", "eyre", - "nix 0.31.1", + "nix 0.31.2", "reth-chainspec", "reth-cli-runner", "reth-cli-util", @@ -8938,6 +8947,7 @@ version = "1.11.1" dependencies = [ "bitflags 2.11.0", "byteorder", + "crossbeam-queue", "dashmap", "derive_more", "parking_lot", @@ -11015,15 +11025,15 @@ dependencies = [ [[package]] name = "rustix" -version = "1.1.3" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "146c9e247ccc180c1f61615433868c99f3de3ae256a30a43b49f67c2d9171f34" +checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" dependencies = [ "bitflags 2.11.0", "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -11081,7 +11091,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs 0.26.11", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -11855,15 +11865,15 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.25.0" +version = "3.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0136791f7c95b1f6dd99f9cc786b91bb81c3800b639b3478e561ddb7be95e5f1" +checksum = "82a72c767771b47409d2345987fda8628641887d5466101319899796367354a0" dependencies = [ "fastrand 2.3.0", "getrandom 0.4.1", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -12575,7 +12585,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c5f7c95348f20c1c913d72157b3c6dee6ea3e30b3d19502c5a7f6d3f160dacbf" dependencies = [ "cc", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -12962,9 +12972,9 @@ dependencies = [ [[package]] name = "wasm-bindgen" -version = "0.2.113" +version = "0.2.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60722a937f594b7fde9adb894d7c092fc1bb6612897c46368d18e7a20208eff2" +checksum = "6532f9a5c1ece3798cb1c2cfdba640b9b3ba884f5db45973a6f442510a87d38e" dependencies = [ "cfg-if", "once_cell", @@ -12975,9 +12985,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.63" +version = "0.4.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a89f4650b770e4521aa6573724e2aed4704372151bd0de9d16a3bbabb87441a" +checksum = "e9c5522b3a28661442748e09d40924dfb9ca614b21c00d3fd135720e48b67db8" dependencies = [ "cfg-if", "futures-util", @@ -12989,9 +12999,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.113" +version = "0.2.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fac8c6395094b6b91c4af293f4c79371c163f9a6f56184d2c9a85f5a95f3950" +checksum = "18a2d50fcf105fb33bb15f00e7a77b772945a2ee45dcf454961fd843e74c18e6" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -12999,9 +13009,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.113" +version = "0.2.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab3fabce6159dc20728033842636887e4877688ae94382766e00b180abac9d60" +checksum = "03ce4caeaac547cdf713d280eda22a730824dd11e6b8c3ca9e42247b25c631e3" dependencies = [ "bumpalo", "proc-macro2", @@ -13012,9 +13022,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.113" +version = "0.2.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de0e091bdb824da87dc01d967388880d017a0a9bc4f3bdc0d86ee9f9336e3bb5" +checksum = "75a326b8c223ee17883a4251907455a2431acc2791c98c26279376490c378c16" dependencies = [ "unicode-ident", ] @@ -13082,9 +13092,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.90" +version = "0.3.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "705eceb4ce901230f8625bd1d665128056ccbe4b7408faa625eec1ba80f59a97" +checksum = "854ba17bb104abfb26ba36da9729addc7ce7f06f5c0f90f3c391f8461cca21f9" dependencies = [ "js-sys", "wasm-bindgen", @@ -13164,7 +13174,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 7d71afa919..6bf3a96698 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -660,6 +660,7 @@ cipher = "0.4.3" comfy-table = "7.0" concat-kdf = "0.1.0" crossbeam-channel = "0.5.13" +crossbeam-queue = "0.3" crossbeam-utils = "0.8" crossterm = "0.29.0" csv = "1.3.0" diff --git a/crates/storage/libmdbx-rs/Cargo.toml b/crates/storage/libmdbx-rs/Cargo.toml index 4a1d6aa010..540bdedd33 100644 --- a/crates/storage/libmdbx-rs/Cargo.toml +++ b/crates/storage/libmdbx-rs/Cargo.toml @@ -17,6 +17,7 @@ reth-mdbx-sys.workspace = true bitflags.workspace = true byteorder.workspace = true derive_more.workspace = true +crossbeam-queue.workspace = true parking_lot.workspace = true smallvec.workspace = true thiserror.workspace = true diff --git a/crates/storage/libmdbx-rs/src/environment.rs b/crates/storage/libmdbx-rs/src/environment.rs index 62fd272aa9..0154d09703 100644 --- a/crates/storage/libmdbx-rs/src/environment.rs +++ b/crates/storage/libmdbx-rs/src/environment.rs @@ -4,6 +4,7 @@ use crate::{ flags::EnvironmentFlags, transaction::{RO, RW}, txn_manager::{TxnManager, TxnManagerMessage, TxnPtr}, + txn_pool::ReadTxnPool, Mode, SyncMode, Transaction, TransactionKind, }; use byteorder::{ByteOrder, NativeEndian}; @@ -95,11 +96,23 @@ impl Environment { } /// Create a read-only transaction for use with the environment. + /// + /// Reuses a previously-reset transaction handle from the internal pool when available, + /// avoiding the `lck_rdt_lock` mutex in MDBX's `mvcc_bind_slot` on each new transaction. #[inline] pub fn begin_ro_txn(&self) -> Result> { + if let Some(txn_ptr) = self.inner.ro_txn_pool.pop() { + return Ok(Transaction::new_from_ptr(self.clone(), txn_ptr)); + } Transaction::new(self.clone()) } + /// Returns the read transaction pool. + #[inline] + pub(crate) fn ro_txn_pool(&self) -> &ReadTxnPool { + &self.inner.ro_txn_pool + } + /// Create a read-write transaction for use with the environment. This method will block while /// there are any other read-write transactions open on the environment. pub fn begin_rw_txn(&self) -> Result> { @@ -239,10 +252,15 @@ struct EnvironmentInner { env_kind: EnvironmentKind, /// Transaction manager txn_manager: TxnManager, + /// Pool of reset read-only transaction handles for reuse. + ro_txn_pool: ReadTxnPool, } impl Drop for EnvironmentInner { fn drop(&mut self) { + // Abort all pooled read transactions before closing the environment. + self.ro_txn_pool.drain(); + // Close open mdbx environment on drop unsafe { ffi::mdbx_env_close_ex(self.env, false); @@ -750,7 +768,12 @@ impl EnvironmentBuilder { } }; - let env = EnvironmentInner { env, txn_manager, env_kind: self.kind }; + let env = EnvironmentInner { + env, + txn_manager, + env_kind: self.kind, + ro_txn_pool: ReadTxnPool::new(), + }; Ok(Environment { inner: Arc::new(env) }) } diff --git a/crates/storage/libmdbx-rs/src/lib.rs b/crates/storage/libmdbx-rs/src/lib.rs index 88fe8e3e7c..728ccb337b 100644 --- a/crates/storage/libmdbx-rs/src/lib.rs +++ b/crates/storage/libmdbx-rs/src/lib.rs @@ -35,6 +35,7 @@ mod error; mod flags; mod transaction; mod txn_manager; +mod txn_pool; #[cfg(test)] mod test_utils { diff --git a/crates/storage/libmdbx-rs/src/transaction.rs b/crates/storage/libmdbx-rs/src/transaction.rs index 4d7c7ea979..c3c907eeee 100644 --- a/crates/storage/libmdbx-rs/src/transaction.rs +++ b/crates/storage/libmdbx-rs/src/transaction.rs @@ -350,9 +350,10 @@ where #[cfg(feature = "read-tx-timeouts")] self.env.txn_manager().remove_active_read_transaction(txn); - unsafe { - ffi::mdbx_txn_abort(txn); - } + // Reset and return the handle to the pool for lock-free reuse. + // pool.put() calls mdbx_txn_reset internally and falls back to + // mdbx_txn_abort if the reset fails or the pool is full. + self.env.ro_txn_pool().push(txn); } else { let (sender, rx) = sync_channel(0); self.env diff --git a/crates/storage/libmdbx-rs/src/txn_pool.rs b/crates/storage/libmdbx-rs/src/txn_pool.rs new file mode 100644 index 0000000000..f8d09a3a6b --- /dev/null +++ b/crates/storage/libmdbx-rs/src/txn_pool.rs @@ -0,0 +1,408 @@ +use crate::error::mdbx_result; +use crossbeam_queue::ArrayQueue; + +/// Lock-free pool of reset read-only MDBX transaction handles. +/// +/// With `MDBX_NOTLS` (which reth always sets), every `mdbx_txn_begin_ex` for a read transaction +/// calls `mvcc_bind_slot`, which acquires `lck_rdt_lock` — a pthread mutex. Under high +/// concurrency (e.g., prewarming), this becomes a contention point. +/// +/// This pool caches transaction handles that have been reset via `mdbx_txn_reset`. A reset handle +/// retains its reader slot, so `mdbx_txn_renew` can reactivate it without touching the reader +/// table mutex. +pub(crate) struct ReadTxnPool { + queue: ArrayQueue, +} + +/// Wrapper around a raw txn pointer to satisfy `Send + Sync` for the queue. +struct PooledTxn(*mut ffi::MDBX_txn); + +// SAFETY: MDBX txn pointers are safe to send across threads — we ensure exclusive +// ownership via the queue's push/pop semantics. +unsafe impl Send for PooledTxn {} +unsafe impl Sync for PooledTxn {} + +impl ReadTxnPool { + pub(crate) fn new() -> Self { + Self { queue: ArrayQueue::new(256) } + } + + /// Takes a reset transaction handle from the pool, renews it, and returns it ready for use. + /// + /// Returns `None` if the pool is empty or all renew attempts fail. + pub(crate) fn pop(&self) -> Option<*mut ffi::MDBX_txn> { + while let Some(handle) = self.queue.pop() { + let txn = handle.0; + // SAFETY: this pointer was previously created by mdbx_txn_begin_ex and reset + // via mdbx_txn_reset. mdbx_txn_renew reuses the existing reader slot without + // taking lck_rdt_lock. + match mdbx_result(unsafe { ffi::mdbx_txn_renew(txn) }) { + Ok(_) => return Some(txn), + Err(e) => { + tracing::warn!(target: "libmdbx", %e, "failed to renew pooled read transaction"); + abort_txn(txn); + } + } + } + None + } + + /// Resets an active read transaction handle and returns it to the pool. + /// + /// If reset fails, the handle is aborted instead. + pub(crate) fn push(&self, txn: *mut ffi::MDBX_txn) { + // mdbx_txn_reset releases the MVCC snapshot but keeps the reader slot. + if let Err(e) = mdbx_result(unsafe { ffi::mdbx_txn_reset(txn) }) { + tracing::warn!(target: "libmdbx", %e, "failed to reset read transaction for pooling"); + abort_txn(txn); + return; + } + + if self.queue.push(PooledTxn(txn)).is_err() { + abort_txn(txn); + } + } + + /// Aborts all pooled transaction handles. Called during environment shutdown. + pub(crate) fn drain(&self) { + while let Some(handle) = self.queue.pop() { + abort_txn(handle.0); + } + } +} + +/// Aborts a transaction handle, logging any error. +fn abort_txn(txn: *mut ffi::MDBX_txn) { + if let Err(e) = mdbx_result(unsafe { ffi::mdbx_txn_abort(txn) }) { + tracing::error!(target: "libmdbx", %e, "failed to abort transaction"); + } +} + +impl Drop for ReadTxnPool { + fn drop(&mut self) { + self.drain(); + } +} + +#[cfg(test)] +mod tests { + use crate::{Environment, WriteFlags}; + + /// Opens a fresh test environment. + fn test_env() -> (tempfile::TempDir, Environment) { + let dir = tempfile::tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + (dir, env) + } + + /// Inserts a single key so the database isn't empty. + fn seed(env: &Environment) { + let tx = env.begin_rw_txn().unwrap(); + let db = tx.open_db(None).unwrap(); + tx.put(db.dbi(), b"key", b"val", WriteFlags::empty()).unwrap(); + tx.commit().unwrap(); + } + + #[test] + fn get_returns_none_when_empty() { + let (_dir, env) = test_env(); + assert!(env.ro_txn_pool().pop().is_none()); + } + + #[test] + fn put_get_roundtrip() { + let (_dir, env) = test_env(); + seed(&env); + + // Open and drop a read txn — drop returns the handle to the pool. + let txn = env.begin_ro_txn().unwrap(); + drop(txn); + + // Next begin_ro_txn should reuse the pooled handle. + let txn = env.begin_ro_txn().unwrap(); + let _id = txn.id().unwrap(); + } + + #[test] + fn pooled_txn_reads_latest_snapshot() { + let (_dir, env) = test_env(); + seed(&env); + + // Open a read txn and drop it to pool the handle. + let txn = env.begin_ro_txn().unwrap(); + drop(txn); + + // Write new data. + { + let tx = env.begin_rw_txn().unwrap(); + let db = tx.open_db(None).unwrap(); + tx.put(db.dbi(), b"key2", b"val2", WriteFlags::empty()).unwrap(); + tx.commit().unwrap(); + } + + // The renewed pooled txn must see the new data. + let txn = env.begin_ro_txn().unwrap(); + let db = txn.open_db(None).unwrap(); + let val: Option<[u8; 4]> = txn.get(db.dbi(), b"key2").unwrap(); + assert_eq!(val, Some(*b"val2")); + } + + #[test] + fn multiple_put_get_cycles() { + let (_dir, env) = test_env(); + seed(&env); + + for _ in 0..50 { + let txn = env.begin_ro_txn().unwrap(); + let db = txn.open_db(None).unwrap(); + let val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap(); + assert_eq!(val, Some(*b"val")); + drop(txn); + } + } + + #[test] + fn concurrent_txns_pool_multiple_handles() { + let (_dir, env) = test_env(); + seed(&env); + + // Open several txns concurrently — each gets a fresh handle. + let txns: Vec<_> = (0..8).map(|_| env.begin_ro_txn().unwrap()).collect(); + + // Drop them all — pool should accumulate handles. + let count = txns.len(); + drop(txns); + + // Reopen same number — all should come from the pool. + let txns: Vec<_> = (0..count).map(|_| env.begin_ro_txn().unwrap()).collect(); + for txn in &txns { + let db = txn.open_db(None).unwrap(); + let val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap(); + assert_eq!(val, Some(*b"val")); + } + } + + #[test] + fn drain_empties_pool() { + let (_dir, env) = test_env(); + seed(&env); + + // Pool a handle. + let txn = env.begin_ro_txn().unwrap(); + drop(txn); + + env.ro_txn_pool().drain(); + + // Pool is empty — get should return None. + assert!(env.ro_txn_pool().pop().is_none()); + } + + #[test] + fn committed_txn_is_not_pooled() { + let (_dir, env) = test_env(); + seed(&env); + + let txn = env.begin_ro_txn().unwrap(); + txn.commit().unwrap(); + + // Committed txns are freed by mdbx, not returned to pool. + assert!(env.ro_txn_pool().pop().is_none()); + } + + #[test] + fn multithreaded_pool_usage() { + let (_dir, env) = test_env(); + seed(&env); + + let env = std::sync::Arc::new(env); + let barrier = std::sync::Arc::new(std::sync::Barrier::new(8)); + + let handles: Vec<_> = (0..8) + .map(|_| { + let env = env.clone(); + let barrier = barrier.clone(); + std::thread::spawn(move || { + barrier.wait(); + for _ in 0..100 { + let txn = env.begin_ro_txn().unwrap(); + let db = txn.open_db(None).unwrap(); + let val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap(); + assert_eq!(val, Some(*b"val")); + drop(txn); + } + }) + }) + .collect(); + + for h in handles { + h.join().unwrap(); + } + } + + #[test] + fn multithreaded_mixed_read_write() { + let (_dir, env) = test_env(); + seed(&env); + + let env = std::sync::Arc::new(env); + let barrier = std::sync::Arc::new(std::sync::Barrier::new(5)); + + // Spawn reader threads. + let mut handles: Vec<_> = (0..4) + .map(|_| { + let env = env.clone(); + let barrier = barrier.clone(); + std::thread::spawn(move || { + barrier.wait(); + for _ in 0..50 { + let txn = env.begin_ro_txn().unwrap(); + let db = txn.open_db(None).unwrap(); + // key may or may not exist depending on writer timing. + let _val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap(); + drop(txn); + } + }) + }) + .collect(); + + // Spawn a writer thread. + handles.push(std::thread::spawn(move || { + barrier.wait(); + for i in 0u32..20 { + let tx = env.begin_rw_txn().unwrap(); + let db = tx.open_db(None).unwrap(); + tx.put(db.dbi(), i.to_le_bytes(), b"v", WriteFlags::empty()).unwrap(); + tx.commit().unwrap(); + } + })); + + for h in handles { + h.join().unwrap(); + } + } + + #[test] + fn multithreaded_concurrent_open_close() { + let (_dir, env) = test_env(); + seed(&env); + + let env = std::sync::Arc::new(env); + let barrier = std::sync::Arc::new(std::sync::Barrier::new(16)); + + // 16 threads each open and close 200 txns — exercises pool contention. + let handles: Vec<_> = (0..16) + .map(|_| { + let env = env.clone(); + let barrier = barrier.clone(); + std::thread::spawn(move || { + barrier.wait(); + for _ in 0..200 { + let txn = env.begin_ro_txn().unwrap(); + let db = txn.open_db(None).unwrap(); + let val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap(); + assert_eq!(val, Some(*b"val")); + // Intentionally don't call drop explicitly — let scope handle it. + } + }) + }) + .collect(); + + for h in handles { + h.join().unwrap(); + } + } + + #[test] + fn multithreaded_hold_multiple_txns() { + let (_dir, env) = test_env(); + seed(&env); + + let env = std::sync::Arc::new(env); + let barrier = std::sync::Arc::new(std::sync::Barrier::new(8)); + + // Each thread holds multiple txns open simultaneously, then drops them all. + let handles: Vec<_> = (0..8) + .map(|_| { + let env = env.clone(); + let barrier = barrier.clone(); + std::thread::spawn(move || { + barrier.wait(); + for _ in 0..20 { + let txns: Vec<_> = (0..4).map(|_| env.begin_ro_txn().unwrap()).collect(); + for txn in &txns { + let db = txn.open_db(None).unwrap(); + let val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap(); + assert_eq!(val, Some(*b"val")); + } + drop(txns); + } + }) + }) + .collect(); + + for h in handles { + h.join().unwrap(); + } + } + + #[test] + fn multithreaded_drain_under_contention() { + let (_dir, env) = test_env(); + seed(&env); + + let env = std::sync::Arc::new(env); + + // Fill the pool with handles. + { + let txns: Vec<_> = (0..16).map(|_| env.begin_ro_txn().unwrap()).collect(); + drop(txns); + } + + let barrier = std::sync::Arc::new(std::sync::Barrier::new(5)); + + // 4 threads racing to get from pool + 1 thread draining. + let mut handles: Vec<_> = (0..4) + .map(|_| { + let env = env.clone(); + let barrier = barrier.clone(); + std::thread::spawn(move || { + barrier.wait(); + for _ in 0..50 { + let _txn = env.begin_ro_txn(); // may or may not get a pooled handle + } + }) + }) + .collect(); + + handles.push(std::thread::spawn(move || { + barrier.wait(); + env.ro_txn_pool().drain(); + })); + + for h in handles { + h.join().unwrap(); + } + } + + #[test] + fn pool_overflow_aborts_excess() { + let dir = tempfile::tempdir().unwrap(); + let env = Environment::builder().set_max_readers(512).open(dir.path()).unwrap(); + seed(&env); + + // Open more txns than the pool capacity (256), drop them all. + let txns: Vec<_> = (0..300).map(|_| env.begin_ro_txn().unwrap()).collect(); + drop(txns); + + // Pool is capped at 256; excess handles are aborted. + assert_eq!(env.ro_txn_pool().queue.len(), 256); + + // All 256 pooled handles should still work. + let txns: Vec<_> = (0..256).map(|_| env.begin_ro_txn().unwrap()).collect(); + for txn in &txns { + let db = txn.open_db(None).unwrap(); + let val: Option<[u8; 3]> = txn.get(db.dbi(), b"key").unwrap(); + assert_eq!(val, Some(*b"val")); + } + } +}