fix: call cancel_all_background_work on RocksDBProviderInner drop (#20895)

This commit is contained in:
joshieDo
2026-01-09 19:53:31 +00:00
committed by GitHub
parent 0db3813941
commit 485fa3448d

View File

@@ -12,8 +12,8 @@ use reth_storage_errors::{
};
use rocksdb::{
BlockBasedOptions, Cache, ColumnFamilyDescriptor, CompactionPri, DBCompressionType,
DBRawIteratorWithThreadMode, IteratorMode, Options, Transaction, TransactionDB,
TransactionDBOptions, TransactionOptions, WriteBatchWithTransaction, WriteOptions,
DBRawIteratorWithThreadMode, IteratorMode, OptimisticTransactionDB,
OptimisticTransactionOptions, Options, Transaction, WriteBatchWithTransaction, WriteOptions,
};
use std::{
fmt,
@@ -200,20 +200,17 @@ impl RocksDBBuilder {
})
.collect();
// Use TransactionDB for MDBX-like transaction semantics (read-your-writes, rollback)
let txn_db_options = TransactionDBOptions::default();
let db = TransactionDB::open_cf_descriptors(
&options,
&txn_db_options,
&self.path,
cf_descriptors,
)
.map_err(|e| {
ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})?;
// Use OptimisticTransactionDB for MDBX-like transaction semantics (read-your-writes,
// rollback) OptimisticTransactionDB uses optimistic concurrency control (conflict
// detection at commit) and is backed by DBCommon, giving us access to
// cancel_all_background_work for clean shutdown.
let db = OptimisticTransactionDB::open_cf_descriptors(&options, &self.path, cf_descriptors)
.map_err(|e| {
ProviderError::Database(DatabaseError::Open(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})?;
let metrics = self.enable_metrics.then(RocksDBMetrics::default);
@@ -241,8 +238,8 @@ pub struct RocksDBProvider(Arc<RocksDBProviderInner>);
/// Inner state for `RocksDB` provider.
struct RocksDBProviderInner {
/// `RocksDB` database instance with transaction support.
db: TransactionDB,
/// `RocksDB` database instance with optimistic transaction support.
db: OptimisticTransactionDB,
/// Metrics latency & operations.
metrics: Option<RocksDBMetrics>,
}
@@ -250,12 +247,20 @@ struct RocksDBProviderInner {
impl fmt::Debug for RocksDBProviderInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RocksDBProviderInner")
.field("db", &"<TransactionDB>")
.field("db", &"<OptimisticTransactionDB>")
.field("metrics", &self.metrics)
.finish()
}
}
impl Drop for RocksDBProviderInner {
fn drop(&mut self) {
// Cancel all background work (compaction, flush) before dropping.
// This prevents pthread lock errors during shutdown.
self.db.cancel_all_background_work(true);
}
}
impl Clone for RocksDBProvider {
fn clone(&self) -> Self {
Self(self.0.clone())
@@ -274,9 +279,12 @@ impl RocksDBProvider {
}
/// Creates a new transaction with MDBX-like semantics (read-your-writes, rollback).
///
/// Note: With `OptimisticTransactionDB`, commits may fail if there are conflicts.
/// Conflict detection happens at commit time, not at write time.
pub fn tx(&self) -> RocksTx<'_> {
let write_options = WriteOptions::default();
let txn_options = TransactionOptions::default();
let txn_options = OptimisticTransactionOptions::default();
let inner = self.0.db.transaction_opt(&write_options, &txn_options);
RocksTx { inner, provider: self }
}
@@ -564,7 +572,7 @@ impl<'a> RocksDBBatch<'a> {
/// Note: `Transaction` is `Send` but NOT `Sync`. This wrapper does not implement
/// `DbTx`/`DbTxMut` traits directly; use RocksDB-specific methods instead.
pub struct RocksTx<'db> {
inner: Transaction<'db, TransactionDB>,
inner: Transaction<'db, OptimisticTransactionDB>,
provider: &'db RocksDBProvider,
}
@@ -747,7 +755,7 @@ impl<'db> RocksTx<'db> {
})?;
// Create a raw iterator to access key bytes directly.
let mut iter: DBRawIteratorWithThreadMode<'_, Transaction<'_, TransactionDB>> =
let mut iter: DBRawIteratorWithThreadMode<'_, Transaction<'_, OptimisticTransactionDB>> =
self.inner.raw_iterator_cf(&cf);
// Seek to the smallest key >= encoded_key.
@@ -825,7 +833,7 @@ impl<'db> RocksTx<'db> {
/// Returns an error if the raw iterator is in an invalid state due to an I/O error.
fn raw_iter_status_ok(
iter: &DBRawIteratorWithThreadMode<'_, Transaction<'_, TransactionDB>>,
iter: &DBRawIteratorWithThreadMode<'_, Transaction<'_, OptimisticTransactionDB>>,
) -> ProviderResult<()> {
iter.status().map_err(|e| {
ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
@@ -840,7 +848,7 @@ impl<'db> RocksTx<'db> {
///
/// Yields decoded `(Key, Value)` pairs in key order.
pub struct RocksDBIter<'db, T: Table> {
inner: rocksdb::DBIteratorWithThreadMode<'db, TransactionDB>,
inner: rocksdb::DBIteratorWithThreadMode<'db, OptimisticTransactionDB>,
_marker: std::marker::PhantomData<T>,
}
@@ -884,7 +892,7 @@ impl<T: Table> Iterator for RocksDBIter<'_, T> {
///
/// Yields decoded `(Key, Value)` pairs. Sees uncommitted writes.
pub struct RocksTxIter<'tx, T: Table> {
inner: rocksdb::DBIteratorWithThreadMode<'tx, Transaction<'tx, TransactionDB>>,
inner: rocksdb::DBIteratorWithThreadMode<'tx, Transaction<'tx, OptimisticTransactionDB>>,
_marker: std::marker::PhantomData<T>,
}
@@ -1092,7 +1100,7 @@ mod tests {
.build()
.unwrap();
// Do operations - data should be immediately readable with TransactionDB
// Do operations - data should be immediately readable with OptimisticTransactionDB
for i in 0..10 {
let value = vec![i as u8];
provider.put::<TestTable>(i, &value).unwrap();
@@ -1107,7 +1115,7 @@ mod tests {
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// Insert data - TransactionDB writes are immediately visible
// Insert data - OptimisticTransactionDB writes are immediately visible
let value = vec![42u8; 1000];
for i in 0..100 {
provider.put::<TestTable>(i, &value).unwrap();