diff --git a/crates/storage/provider/src/either_writer.rs b/crates/storage/provider/src/either_writer.rs index 34859e224f..ce50175fce 100644 --- a/crates/storage/provider/src/either_writer.rs +++ b/crates/storage/provider/src/either_writer.rs @@ -4,7 +4,7 @@ use std::{marker::PhantomData, ops::Range}; #[cfg(all(unix, feature = "rocksdb"))] -use crate::providers::rocksdb::RocksTx; +use crate::providers::rocksdb::RocksDBBatch; use crate::{ providers::{StaticFileProvider, StaticFileProviderRWRefMut}, StaticFileProviderFactory, @@ -36,11 +36,12 @@ type EitherWriterTy<'a, P, T> = EitherWriter<

::Primitives, >; -// Helper type so constructors stay exported even when RocksDB feature is off. +// Helper types so constructors stay exported even when RocksDB feature is off. +// Historical data tables use a write-only RocksDB batch (no read-your-writes needed). #[cfg(all(unix, feature = "rocksdb"))] -type RocksTxArg<'a> = crate::providers::rocksdb::RocksTx<'a>; +type RocksBatchArg<'a> = crate::providers::rocksdb::RocksDBBatch<'a>; #[cfg(not(all(unix, feature = "rocksdb")))] -type RocksTxArg<'a> = (); +type RocksBatchArg<'a> = (); #[cfg(all(unix, feature = "rocksdb"))] type RocksTxRefArg<'a> = &'a crate::providers::rocksdb::RocksTx<'a>; @@ -54,9 +55,9 @@ pub enum EitherWriter<'a, CURSOR, N> { Database(CURSOR), /// Write to static file StaticFile(StaticFileProviderRWRefMut<'a, N>), - /// Write to `RocksDB` transaction + /// Write to `RocksDB` using a write-only batch (historical tables). #[cfg(all(unix, feature = "rocksdb"))] - RocksDB(RocksTx<'a>), + RocksDB(RocksDBBatch<'a>), } impl<'a> EitherWriter<'a, (), ()> { @@ -129,7 +130,7 @@ impl<'a> EitherWriter<'a, (), ()> { /// Creates a new [`EitherWriter`] for storages history based on storage settings. pub fn new_storages_history

( provider: &P, - _rocksdb_tx: RocksTxArg<'a>, + _rocksdb_batch: RocksBatchArg<'a>, ) -> ProviderResult> where P: DBProvider + NodePrimitivesProvider + StorageSettingsCache, @@ -137,7 +138,7 @@ impl<'a> EitherWriter<'a, (), ()> { { #[cfg(all(unix, feature = "rocksdb"))] if provider.cached_storage_settings().storages_history_in_rocksdb { - return Ok(EitherWriter::RocksDB(_rocksdb_tx)); + return Ok(EitherWriter::RocksDB(_rocksdb_batch)); } Ok(EitherWriter::Database(provider.tx_ref().cursor_write::()?)) @@ -146,7 +147,7 @@ impl<'a> EitherWriter<'a, (), ()> { /// Creates a new [`EitherWriter`] for transaction hash numbers based on storage settings. pub fn new_transaction_hash_numbers

( provider: &P, - _rocksdb_tx: RocksTxArg<'a>, + _rocksdb_batch: RocksBatchArg<'a>, ) -> ProviderResult> where P: DBProvider + NodePrimitivesProvider + StorageSettingsCache, @@ -154,7 +155,7 @@ impl<'a> EitherWriter<'a, (), ()> { { #[cfg(all(unix, feature = "rocksdb"))] if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb { - return Ok(EitherWriter::RocksDB(_rocksdb_tx)); + return Ok(EitherWriter::RocksDB(_rocksdb_batch)); } Ok(EitherWriter::Database( @@ -165,7 +166,7 @@ impl<'a> EitherWriter<'a, (), ()> { /// Creates a new [`EitherWriter`] for account history based on storage settings. pub fn new_accounts_history

( provider: &P, - _rocksdb_tx: RocksTxArg<'a>, + _rocksdb_batch: RocksBatchArg<'a>, ) -> ProviderResult> where P: DBProvider + NodePrimitivesProvider + StorageSettingsCache, @@ -173,7 +174,7 @@ impl<'a> EitherWriter<'a, (), ()> { { #[cfg(all(unix, feature = "rocksdb"))] if provider.cached_storage_settings().account_history_in_rocksdb { - return Ok(EitherWriter::RocksDB(_rocksdb_tx)); + return Ok(EitherWriter::RocksDB(_rocksdb_batch)); } Ok(EitherWriter::Database(provider.tx_ref().cursor_write::()?)) @@ -207,18 +208,6 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> { Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider), } } - - /// Commits the `RocksDB` transaction if this is a `RocksDB` writer. - /// - /// For [`Self::Database`] and [`Self::StaticFile`], this is a no-op as they use - /// different commit patterns (MDBX transaction commit, static file sync). - #[cfg(all(unix, feature = "rocksdb"))] - pub fn commit(self) -> ProviderResult<()> { - match self { - Self::Database(_) | Self::StaticFile(_) => Ok(()), - Self::RocksDB(tx) => tx.commit(), - } - } } impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 25acfccd19..e9d5a7c350 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -35,7 +35,7 @@ pub use consistent::ConsistentProvider; #[cfg_attr(not(all(unix, feature = "rocksdb")), path = "rocksdb_stub.rs")] pub(crate) mod rocksdb; -pub use rocksdb::{RocksDBBuilder, RocksDBProvider, RocksTx}; +pub use rocksdb::{RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksTx}; /// Helper trait to bound [`NodeTypes`] so that combined with database they satisfy /// [`ProviderNodeTypes`]. diff --git a/crates/storage/provider/src/providers/rocksdb/mod.rs b/crates/storage/provider/src/providers/rocksdb/mod.rs index a0bc3bebc6..db6fd3451b 100644 --- a/crates/storage/provider/src/providers/rocksdb/mod.rs +++ b/crates/storage/provider/src/providers/rocksdb/mod.rs @@ -2,4 +2,4 @@ mod metrics; mod provider; -pub use provider::{RocksDBBuilder, RocksDBProvider, RocksTx}; +pub use provider::{RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksTx}; diff --git a/crates/storage/provider/src/providers/rocksdb/provider.rs b/crates/storage/provider/src/providers/rocksdb/provider.rs index 057ea91b2c..975772f808 100644 --- a/crates/storage/provider/src/providers/rocksdb/provider.rs +++ b/crates/storage/provider/src/providers/rocksdb/provider.rs @@ -34,6 +34,11 @@ const DEFAULT_BYTES_PER_SYNC: u64 = 1_048_576; /// Default bloom filter bits per key (~1% false positive rate). const DEFAULT_BLOOM_FILTER_BITS: f64 = 10.0; +/// Default buffer capacity for compression in batches. +/// 4 KiB matches common block/page sizes and comfortably holds typical history values, +/// reducing the first few reallocations without over-allocating. +const DEFAULT_COMPRESS_BUF_CAPACITY: usize = 4096; + /// Builder for [`RocksDBProvider`]. pub struct RocksDBBuilder { path: PathBuf, @@ -261,6 +266,18 @@ impl RocksDBProvider { RocksTx { inner, provider: self } } + /// Creates a new batch for atomic writes. + /// + /// Use [`Self::write_batch`] for closure-based atomic writes. + /// Use this method when the batch needs to be held by [`crate::EitherWriter`]. + pub fn batch(&self) -> RocksDBBatch<'_> { + RocksDBBatch { + provider: self, + inner: WriteBatchWithTransaction::::default(), + buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY), + } + } + /// Gets the column family handle for a table. fn get_cf_handle(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> { self.0 @@ -356,29 +373,21 @@ impl RocksDBProvider { where F: FnOnce(&mut RocksDBBatch<'_>) -> ProviderResult<()>, { - // Note: Using "Batch" as table name for batch operations across multiple tables self.execute_with_operation_metric(RocksDBOperation::BatchWrite, "Batch", |this| { - let mut batch_handle = RocksDBBatch { - provider: this, - inner: WriteBatchWithTransaction::::default(), - buf: Vec::new(), - }; - + let mut batch_handle = this.batch(); f(&mut batch_handle)?; - - this.0.db.write(batch_handle.inner).map_err(|e| { - ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo { - message: e.to_string().into(), - code: -1, - })) - }) + batch_handle.commit() }) } } /// Handle for building a batch of operations atomically. /// -/// Uses `WriteBatchWithTransaction` for compatibility with `TransactionDB`. +/// Uses `WriteBatchWithTransaction` for atomic writes without full transaction overhead. +/// Unlike [`RocksTx`], this does NOT support read-your-writes. Use for write-only flows +/// where you don't need to read back uncommitted data within the same operation +/// (e.g., history index writes). +#[must_use = "batch must be committed"] pub struct RocksDBBatch<'a> { provider: &'a RocksDBProvider, inner: WriteBatchWithTransaction, @@ -422,6 +431,28 @@ impl<'a> RocksDBBatch<'a> { self.inner.delete_cf(self.provider.get_cf_handle::()?, key.encode().as_ref()); Ok(()) } + + /// Commits the batch to the database. + /// + /// This consumes the batch and writes all operations atomically to `RocksDB`. + pub fn commit(self) -> ProviderResult<()> { + self.provider.0.db.write_opt(self.inner, &WriteOptions::default()).map_err(|e| { + ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo { + message: e.to_string().into(), + code: -1, + })) + }) + } + + /// Returns the number of write operations (puts + deletes) queued in this batch. + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Returns `true` if the batch contains no operations. + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } } /// `RocksDB` transaction wrapper providing MDBX-like semantics. @@ -839,4 +870,36 @@ mod tests { // Commit tx.commit().unwrap(); } + + #[test] + fn test_batch_manual_commit() { + let temp_dir = TempDir::new().unwrap(); + let provider = + RocksDBBuilder::new(temp_dir.path()).with_table::().build().unwrap(); + + // Create a batch via provider.batch() + let mut batch = provider.batch(); + + // Add entries + for i in 0..10u64 { + let value = format!("batch_value_{i}").into_bytes(); + batch.put::(i, &value).unwrap(); + } + + // Verify len/is_empty + assert_eq!(batch.len(), 10); + assert!(!batch.is_empty()); + + // Data should NOT be visible before commit + assert_eq!(provider.get::(0).unwrap(), None); + + // Commit the batch + batch.commit().unwrap(); + + // Now data should be visible + for i in 0..10u64 { + let value = format!("batch_value_{i}").into_bytes(); + assert_eq!(provider.get::(i).unwrap(), Some(value)); + } + } }