refactor(storage): introduce RocksDBWriteMode for flexible write strategies

- Replaced RocksTx with RocksDBWriteMode in EitherWriter to support both transaction and batch writes.
- Updated constructors and methods to utilize RocksDBWriteMode, enhancing write flexibility.
- Added batch handling capabilities in RocksDBProvider, allowing manual commit of batch operations.
- Improved documentation for clarity on usage and commit behavior.
This commit is contained in:
yongkangc
2025-12-15 05:10:02 +00:00
parent 679234f105
commit 5433e6dd8e
4 changed files with 352 additions and 15 deletions

View File

@@ -4,7 +4,7 @@
use std::{marker::PhantomData, ops::Range};
#[cfg(all(unix, feature = "rocksdb"))]
use crate::providers::rocksdb::RocksTx;
use crate::providers::rocksdb::RocksDBWriteMode;
use crate::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut},
StaticFileProviderFactory,
@@ -36,11 +36,12 @@ type EitherWriterTy<'a, P, T> = EitherWriter<
<P as NodePrimitivesProvider>::Primitives,
>;
// Helper type so constructors stay exported even when RocksDB feature is off.
// Helper types so constructors stay exported even when RocksDB feature is off.
// RocksDBWriteMode encapsulates the choice between Transaction and Batch writes.
#[cfg(all(unix, feature = "rocksdb"))]
type RocksTxArg<'a> = crate::providers::rocksdb::RocksTx<'a>;
type RocksWriteModeArg<'a> = crate::providers::rocksdb::RocksDBWriteMode<'a>;
#[cfg(not(all(unix, feature = "rocksdb")))]
type RocksTxArg<'a> = ();
type RocksWriteModeArg<'a> = ();
#[cfg(all(unix, feature = "rocksdb"))]
type RocksTxRefArg<'a> = &'a crate::providers::rocksdb::RocksTx<'a>;
@@ -54,9 +55,12 @@ pub enum EitherWriter<'a, CURSOR, N> {
Database(CURSOR),
/// Write to static file
StaticFile(StaticFileProviderRWRefMut<'a, N>),
/// Write to `RocksDB` transaction
/// Write to `RocksDB` (transaction or batch - internal detail).
///
/// Uses [`RocksDBWriteMode`] to encapsulate the choice between full transaction
/// semantics and high-throughput batch writes.
#[cfg(all(unix, feature = "rocksdb"))]
RocksDB(RocksTx<'a>),
RocksDB(RocksDBWriteMode<'a>),
}
impl<'a> EitherWriter<'a, (), ()> {
@@ -127,9 +131,11 @@ impl<'a> EitherWriter<'a, (), ()> {
}
/// Creates a new [`EitherWriter`] for storages history based on storage settings.
///
/// Accepts either Transaction or Batch mode via [`RocksDBWriteMode`].
pub fn new_storages_history<P>(
provider: &P,
_rocksdb_tx: RocksTxArg<'a>,
_rocksdb_mode: RocksWriteModeArg<'a>,
) -> ProviderResult<EitherWriterTy<'a, P, tables::StoragesHistory>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
@@ -137,16 +143,37 @@ impl<'a> EitherWriter<'a, (), ()> {
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storages_history_in_rocksdb {
return Ok(EitherWriter::RocksDB(_rocksdb_tx));
return Ok(EitherWriter::RocksDB(_rocksdb_mode));
}
Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::StoragesHistory>()?))
}
/// Creates a new [`EitherWriter`] for accounts history based on storage settings.
///
/// Accepts either Transaction or Batch mode via [`RocksDBWriteMode`].
pub fn new_accounts_history<P>(
provider: &P,
_rocksdb_mode: RocksWriteModeArg<'a>,
) -> ProviderResult<EitherWriterTy<'a, P, tables::AccountsHistory>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
P::Tx: DbTxMut,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().account_history_in_rocksdb {
return Ok(EitherWriter::RocksDB(_rocksdb_mode));
}
Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::AccountsHistory>()?))
}
/// Creates a new [`EitherWriter`] for transaction hash numbers based on storage settings.
///
/// Accepts either Transaction or Batch mode via [`RocksDBWriteMode`].
pub fn new_transaction_hash_numbers<P>(
provider: &P,
_rocksdb_tx: RocksTxArg<'a>,
_rocksdb_mode: RocksWriteModeArg<'a>,
) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionHashNumbers>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
@@ -154,7 +181,7 @@ impl<'a> EitherWriter<'a, (), ()> {
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
return Ok(EitherWriter::RocksDB(_rocksdb_tx));
return Ok(EitherWriter::RocksDB(_rocksdb_mode));
}
Ok(EitherWriter::Database(
@@ -191,15 +218,21 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
}
}
/// Commits the `RocksDB` transaction if this is a `RocksDB` writer.
/// Commits `RocksDB` writes if this is a `RocksDB` writer.
///
/// For [`Self::Database`] and [`Self::StaticFile`], this is a no-op as they use
/// different commit patterns (MDBX transaction commit, static file sync).
///
/// # Commit Order
///
/// Call this AFTER the outer MDBX transaction commits successfully. This ensures
/// that if `RocksDB` commit fails, the primary data (MDBX) is still intact and
/// the `RocksDB` data (which is derived) can be rebuilt.
#[cfg(all(unix, feature = "rocksdb"))]
pub fn commit(self) -> ProviderResult<()> {
match self {
Self::Database(_) | Self::StaticFile(_) => Ok(()),
Self::RocksDB(tx) => tx.commit(),
Self::RocksDB(mode) => mode.commit(),
}
}
}

View File

@@ -34,7 +34,7 @@ pub use consistent::ConsistentProvider;
#[cfg(all(unix, feature = "rocksdb"))]
pub(crate) mod rocksdb;
#[cfg(all(unix, feature = "rocksdb"))]
pub use rocksdb::{RocksDBBuilder, RocksDBProvider, RocksTx};
pub use rocksdb::{RocksDBBuilder, RocksDBProvider, RocksDBWriteMode, RocksTx};
/// Helper trait to bound [`NodeTypes`] so that combined with database they satisfy
/// [`ProviderNodeTypes`].

View File

@@ -2,4 +2,7 @@
mod metrics;
mod provider;
pub use provider::{RocksDBBuilder, RocksDBProvider, RocksTx};
pub use provider::{RocksDBBuilder, RocksDBProvider, RocksDBWriteMode, RocksTx};
#[cfg(test)]
pub(crate) use provider::RocksDBBatch;

View File

@@ -18,6 +18,7 @@ use std::{
sync::Arc,
time::Instant,
};
use tracing::warn;
/// Default cache size for `RocksDB` block cache (128 MB).
const DEFAULT_CACHE_SIZE: usize = 128 << 20;
@@ -259,6 +260,26 @@ impl RocksDBProvider {
RocksTx { inner, provider: self }
}
/// Creates a new batch for manual commit.
///
/// Use [`Self::write_batch`] for closure-based atomic writes.
/// Use this method when the batch needs to be held by [`EitherWriter`](crate::EitherWriter).
///
/// # Example
///
/// ```ignore
/// let batch = provider.batch();
/// batch.put::<SomeTable>(key, &value)?;
/// batch.commit()?;
/// ```
pub fn batch(&self) -> RocksDBBatch<'_> {
RocksDBBatch {
provider: self,
inner: WriteBatchWithTransaction::<true>::default(),
buf: Vec::new(),
}
}
/// Gets the column family handle for a table.
fn get_cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
self.0
@@ -364,7 +385,9 @@ impl RocksDBProvider {
f(&mut batch_handle)?;
this.0.db.write(batch_handle.inner).map_err(|e| {
// Take ownership of inner to prevent Drop from logging a warning
let batch = std::mem::take(&mut batch_handle.inner);
this.0.db.write(batch).map_err(|e| {
ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
@@ -420,6 +443,46 @@ impl<'a> RocksDBBatch<'a> {
self.inner.delete_cf(self.provider.get_cf_handle::<T>()?, key.encode().as_ref());
Ok(())
}
/// Commits the batch to the database.
///
/// This consumes the batch and writes all operations atomically to `RocksDB`.
///
/// # Errors
///
/// Returns an error if the write fails.
pub fn commit(mut self) -> ProviderResult<()> {
// Take ownership of inner to prevent Drop from logging a warning
let batch = std::mem::take(&mut self.inner);
self.provider.0.db.write(batch).map_err(|e| {
ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})
}
/// Returns the number of operations in this batch.
pub fn len(&self) -> usize {
self.inner.len()
}
/// Returns `true` if the batch contains no operations.
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}
impl Drop for RocksDBBatch<'_> {
fn drop(&mut self) {
if !self.inner.is_empty() {
warn!(
target: "reth::storage",
batch_len = %self.inner.len(),
"RocksDBBatch dropped without commit - data discarded"
);
}
}
}
/// `RocksDB` transaction wrapper providing MDBX-like semantics.
@@ -583,6 +646,49 @@ impl<T: Table> Iterator for RocksTxIter<'_, T> {
}
}
/// `RocksDB` write strategy - internal implementation detail.
///
/// This enum encapsulates the choice between full transaction semantics
/// and high-throughput batch writes. Use [`RocksDBWriteMode::Transaction`] for
/// read-modify-write operations that need read-your-writes semantics.
/// Use [`RocksDBWriteMode::Batch`] for bulk sync operations where
/// read-your-writes is not needed.
pub enum RocksDBWriteMode<'a> {
/// Full transaction with read-your-writes, rollback support.
/// Use for read-modify-write operations.
Transaction(RocksTx<'a>),
/// Write-only batch for maximum throughput.
/// Use for bulk sync operations where read-your-writes is not needed.
Batch(RocksDBBatch<'a>),
}
impl fmt::Debug for RocksDBWriteMode<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Transaction(tx) => f.debug_tuple("Transaction").field(tx).finish(),
Self::Batch(batch) => f.debug_tuple("Batch").field(batch).finish(),
}
}
}
impl<'a> RocksDBWriteMode<'a> {
/// Puts a value into the specified table.
pub fn put<T: Table>(&mut self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
match self {
Self::Transaction(tx) => tx.put::<T>(key, value),
Self::Batch(batch) => batch.put::<T>(key, value),
}
}
/// Commits the transaction or batch.
pub fn commit(self) -> ProviderResult<()> {
match self {
Self::Transaction(tx) => tx.commit(),
Self::Batch(batch) => batch.commit(),
}
}
}
/// Converts Reth's [`LogLevel`] to `RocksDB`'s [`rocksdb::LogLevel`].
const fn convert_log_level(level: LogLevel) -> rocksdb::LogLevel {
match level {
@@ -837,4 +943,199 @@ mod tests {
// Commit
tx.commit().unwrap();
}
#[test]
fn test_batch_manual_commit() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// Create a batch via provider.batch()
let mut batch = provider.batch();
// Add entries
for i in 0..10u64 {
let value = format!("batch_value_{i}").into_bytes();
batch.put::<TestTable>(i, &value).unwrap();
}
// Verify len/is_empty
assert_eq!(batch.len(), 10);
assert!(!batch.is_empty());
// Data should NOT be visible before commit
assert_eq!(provider.get::<TestTable>(0).unwrap(), None);
// Commit the batch
batch.commit().unwrap();
// Now data should be visible
for i in 0..10u64 {
let value = format!("batch_value_{i}").into_bytes();
assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
}
}
#[test]
fn test_write_mode_batch() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// Create write mode using Batch
let batch = provider.batch();
let mut mode = RocksDBWriteMode::Batch(batch);
// Write via RocksDBWriteMode
let key = 42u64;
let value = b"test_via_mode".to_vec();
mode.put::<TestTable>(key, &value).unwrap();
// Commit via RocksDBWriteMode
mode.commit().unwrap();
// Verify data is visible
assert_eq!(provider.get::<TestTable>(key).unwrap(), Some(value));
}
#[test]
fn test_write_mode_transaction() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// Create write mode using Transaction
let tx = provider.tx();
let mut mode = RocksDBWriteMode::Transaction(tx);
// Write via RocksDBWriteMode
let key = 100u64;
let value = b"test_via_tx_mode".to_vec();
mode.put::<TestTable>(key, &value).unwrap();
// Commit via RocksDBWriteMode
mode.commit().unwrap();
// Verify data is visible
assert_eq!(provider.get::<TestTable>(key).unwrap(), Some(value));
}
#[test]
fn test_batch_empty_commit() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// Create an empty batch
let batch = provider.batch();
assert!(batch.is_empty());
assert_eq!(batch.len(), 0);
// Commit should succeed (no-op)
batch.commit().unwrap();
}
#[test]
fn test_batch_drop_without_commit_does_not_persist() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// Create a batch and add entries but DON'T commit
{
let mut batch = provider.batch();
for i in 0..5u64 {
let value = format!("dropped_value_{i}").into_bytes();
batch.put::<TestTable>(i, &value).unwrap();
}
// batch dropped here without commit - should log warning
}
// Data should NOT be persisted
for i in 0..5u64 {
assert_eq!(
provider.get::<TestTable>(i).unwrap(),
None,
"Dropped batch should not persist data"
);
}
}
#[test]
fn test_write_mode_debug_formatting() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// Test Batch variant debug output
let batch = provider.batch();
let mode_batch = RocksDBWriteMode::Batch(batch);
let debug_str = format!("{:?}", mode_batch);
assert!(debug_str.contains("Batch"), "Debug should contain 'Batch': {debug_str}");
// Test Transaction variant debug output
let tx = provider.tx();
let mode_tx = RocksDBWriteMode::Transaction(tx);
let debug_str = format!("{:?}", mode_tx);
assert!(
debug_str.contains("Transaction"),
"Debug should contain 'Transaction': {debug_str}"
);
}
#[test]
fn test_batch_delete_operation() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// First, insert some data
let mut batch = provider.batch();
for i in 0..5u64 {
batch.put::<TestTable>(i, &vec![i as u8]).unwrap();
}
batch.commit().unwrap();
// Verify data exists
for i in 0..5u64 {
assert!(provider.get::<TestTable>(i).unwrap().is_some());
}
// Now delete via batch
let mut delete_batch = provider.batch();
for i in 0..5u64 {
delete_batch.delete::<TestTable>(i).unwrap();
}
delete_batch.commit().unwrap();
// Verify data is deleted
for i in 0..5u64 {
assert_eq!(provider.get::<TestTable>(i).unwrap(), None);
}
}
#[test]
fn test_batch_overwrite_existing_key() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
let key = 42u64;
let initial_value = b"initial".to_vec();
let updated_value = b"updated".to_vec();
// Insert initial value
let mut batch = provider.batch();
batch.put::<TestTable>(key, &initial_value).unwrap();
batch.commit().unwrap();
assert_eq!(provider.get::<TestTable>(key).unwrap(), Some(initial_value));
// Overwrite with new value
let mut batch = provider.batch();
batch.put::<TestTable>(key, &updated_value).unwrap();
batch.commit().unwrap();
assert_eq!(provider.get::<TestTable>(key).unwrap(), Some(updated_value));
}
}