mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
feat(storage): make it possible to disable read tx timeout (#6181)
This commit is contained in:
@@ -1180,7 +1180,7 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
|
||||
.provider()?
|
||||
// State root calculation can take a while, and we're sure no write transaction
|
||||
// will be open in parallel. See https://github.com/paradigmxyz/reth/issues/6168.
|
||||
.disable_backtrace_on_long_read_transaction();
|
||||
.disable_long_read_transaction_safety();
|
||||
let (state_root, trie_updates) = hashed_state
|
||||
.state_root_with_updates(provider.tx_ref())
|
||||
.map_err(Into::<DatabaseError>::into)?;
|
||||
|
||||
@@ -67,7 +67,7 @@ impl DbTx for TxMock {
|
||||
Ok(self._table.len())
|
||||
}
|
||||
|
||||
fn disable_backtrace_on_long_read_transaction(&mut self) {}
|
||||
fn disable_long_read_transaction_safety(&mut self) {}
|
||||
}
|
||||
|
||||
impl DbTxMut for TxMock {
|
||||
|
||||
@@ -24,8 +24,8 @@ pub trait DbTx: Send + Sync {
|
||||
fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<T>, DatabaseError>;
|
||||
/// Returns number of entries in the table.
|
||||
fn entries<T: Table>(&self) -> Result<usize, DatabaseError>;
|
||||
/// Disables backtrace recording for this read transaction when it's open for too long.
|
||||
fn disable_backtrace_on_long_read_transaction(&mut self);
|
||||
/// Disables long-lived read transaction safety guarantees.
|
||||
fn disable_long_read_transaction_safety(&mut self);
|
||||
}
|
||||
|
||||
/// Read write transaction that allows writing to database
|
||||
|
||||
@@ -76,10 +76,7 @@ impl<K: TransactionKind> Tx<K> {
|
||||
let dbi_handle = handles.get_mut(table as usize).expect("should exist");
|
||||
if dbi_handle.is_none() {
|
||||
*dbi_handle = Some(
|
||||
self.inner
|
||||
.open_db(Some(T::NAME))
|
||||
.map_err(|e| DatabaseError::InitCursor(e.into()))?
|
||||
.dbi(),
|
||||
self.inner.open_db(Some(T::NAME)).map_err(|e| DatabaseError::Open(e.into()))?.dbi(),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -289,11 +286,14 @@ impl<K: TransactionKind> DbTx for Tx<K> {
|
||||
.entries())
|
||||
}
|
||||
|
||||
/// Disables backtrace recording for this read transaction when it's open for too long.
|
||||
fn disable_backtrace_on_long_read_transaction(&mut self) {
|
||||
/// Disables long-lived read transaction safety guarantees, such as backtrace recording and
|
||||
/// timeout.
|
||||
fn disable_long_read_transaction_safety(&mut self) {
|
||||
if let Some(metrics_handler) = self.metrics_handler.as_mut() {
|
||||
metrics_handler.record_backtrace = false;
|
||||
}
|
||||
|
||||
self.inner.disable_timeout();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -353,3 +353,57 @@ impl DbTxMut for Tx<RW> {
|
||||
self.new_cursor()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{
|
||||
database::Database,
|
||||
mdbx::{tx::LONG_TRANSACTION_DURATION, DatabaseArguments},
|
||||
tables,
|
||||
transaction::DbTx,
|
||||
DatabaseEnv, DatabaseEnvKind,
|
||||
};
|
||||
use reth_interfaces::db::DatabaseError;
|
||||
use reth_libmdbx::MaxReadTransactionDuration;
|
||||
use std::{sync::atomic::Ordering, thread::sleep, time::Duration};
|
||||
use tempfile::tempdir;
|
||||
|
||||
#[test]
|
||||
fn long_read_transaction_safety_disabled() {
|
||||
const MAX_DURATION: Duration = Duration::from_secs(1);
|
||||
|
||||
let dir = tempdir().unwrap();
|
||||
let args = DatabaseArguments::default()
|
||||
.max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(MAX_DURATION)));
|
||||
let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
|
||||
|
||||
let mut tx = db.tx().unwrap();
|
||||
tx.disable_long_read_transaction_safety();
|
||||
sleep(MAX_DURATION.max(LONG_TRANSACTION_DURATION));
|
||||
|
||||
assert_eq!(
|
||||
tx.get::<tables::Transactions>(0).err(),
|
||||
Some(DatabaseError::Open(reth_libmdbx::Error::NotFound.to_err_code()))
|
||||
); // Transaction is not timeout-ed
|
||||
assert!(!tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed)); // Backtrace is not recorded
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn long_read_transaction_safety_enabled() {
|
||||
const MAX_DURATION: Duration = Duration::from_secs(1);
|
||||
|
||||
let dir = tempdir().unwrap();
|
||||
let args = DatabaseArguments::default()
|
||||
.max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(MAX_DURATION)));
|
||||
let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
|
||||
|
||||
let tx = db.tx().unwrap();
|
||||
sleep(MAX_DURATION.max(LONG_TRANSACTION_DURATION));
|
||||
|
||||
assert_eq!(
|
||||
tx.get::<tables::Transactions>(0).err(),
|
||||
Some(DatabaseError::Open(reth_libmdbx::Error::ReadTransactionAborted.to_err_code()))
|
||||
); // Transaction is timeout-ed
|
||||
assert!(tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed)); // Backtrace is recorded
|
||||
}
|
||||
}
|
||||
|
||||
@@ -285,6 +285,14 @@ where
|
||||
pub fn cursor_with_dbi(&self, dbi: ffi::MDBX_dbi) -> Result<Cursor<K>> {
|
||||
Cursor::new(self.clone(), dbi)
|
||||
}
|
||||
|
||||
/// Disables a timeout for this read transaction.
|
||||
#[cfg(feature = "read-tx-timeouts")]
|
||||
pub fn disable_timeout(&self) {
|
||||
if K::IS_READ_ONLY {
|
||||
self.env().txn_manager().remove_active_read_transaction(self.txn());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<K> Clone for Transaction<K>
|
||||
|
||||
@@ -246,14 +246,14 @@ impl<TX: DbTx> DatabaseProvider<TX> {
|
||||
.collect::<Result<Vec<_>, DatabaseError>>()
|
||||
}
|
||||
|
||||
/// Disables backtrace recording for the underlying read database transaction when it's open for
|
||||
/// too long.
|
||||
/// Disables long-lived read transaction safety guarantees for leaks prevention and
|
||||
/// observability improvements.
|
||||
///
|
||||
/// CAUTION: In most of the cases, you want to keep backtraces on long read transactions
|
||||
/// CAUTION: In most of the cases, you want the safety guarantees for long read transactions
|
||||
/// enabled. Use this only if you're sure that no write transaction is open in parallel, meaning
|
||||
/// that Reth as a node is offline and not progressing.
|
||||
pub fn disable_backtrace_on_long_read_transaction(mut self) -> Self {
|
||||
self.tx.disable_backtrace_on_long_read_transaction();
|
||||
pub fn disable_long_read_transaction_safety(mut self) -> Self {
|
||||
self.tx.disable_long_read_transaction_safety();
|
||||
self
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user