feat(storage): use RocksDBBatch in EitherWriter and related modules (#20377)

This commit is contained in:
YK
2025-12-16 11:57:41 +08:00
committed by GitHub
parent dd9ff731e4
commit 40e8241bf5
4 changed files with 93 additions and 41 deletions

View File

@@ -4,7 +4,7 @@
use std::{marker::PhantomData, ops::Range};
#[cfg(all(unix, feature = "rocksdb"))]
use crate::providers::rocksdb::RocksTx;
use crate::providers::rocksdb::RocksDBBatch;
use crate::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut},
StaticFileProviderFactory,
@@ -36,11 +36,12 @@ type EitherWriterTy<'a, P, T> = EitherWriter<
<P as NodePrimitivesProvider>::Primitives,
>;
// Helper type so constructors stay exported even when RocksDB feature is off.
// Helper types so constructors stay exported even when RocksDB feature is off.
// Historical data tables use a write-only RocksDB batch (no read-your-writes needed).
#[cfg(all(unix, feature = "rocksdb"))]
type RocksTxArg<'a> = crate::providers::rocksdb::RocksTx<'a>;
type RocksBatchArg<'a> = crate::providers::rocksdb::RocksDBBatch<'a>;
#[cfg(not(all(unix, feature = "rocksdb")))]
type RocksTxArg<'a> = ();
type RocksBatchArg<'a> = ();
#[cfg(all(unix, feature = "rocksdb"))]
type RocksTxRefArg<'a> = &'a crate::providers::rocksdb::RocksTx<'a>;
@@ -54,9 +55,9 @@ pub enum EitherWriter<'a, CURSOR, N> {
Database(CURSOR),
/// Write to static file
StaticFile(StaticFileProviderRWRefMut<'a, N>),
/// Write to `RocksDB` transaction
/// Write to `RocksDB` using a write-only batch (historical tables).
#[cfg(all(unix, feature = "rocksdb"))]
RocksDB(RocksTx<'a>),
RocksDB(RocksDBBatch<'a>),
}
impl<'a> EitherWriter<'a, (), ()> {
@@ -129,7 +130,7 @@ impl<'a> EitherWriter<'a, (), ()> {
/// Creates a new [`EitherWriter`] for storages history based on storage settings.
pub fn new_storages_history<P>(
provider: &P,
_rocksdb_tx: RocksTxArg<'a>,
_rocksdb_batch: RocksBatchArg<'a>,
) -> ProviderResult<EitherWriterTy<'a, P, tables::StoragesHistory>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
@@ -137,7 +138,7 @@ impl<'a> EitherWriter<'a, (), ()> {
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storages_history_in_rocksdb {
return Ok(EitherWriter::RocksDB(_rocksdb_tx));
return Ok(EitherWriter::RocksDB(_rocksdb_batch));
}
Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::StoragesHistory>()?))
@@ -146,7 +147,7 @@ impl<'a> EitherWriter<'a, (), ()> {
/// Creates a new [`EitherWriter`] for transaction hash numbers based on storage settings.
pub fn new_transaction_hash_numbers<P>(
provider: &P,
_rocksdb_tx: RocksTxArg<'a>,
_rocksdb_batch: RocksBatchArg<'a>,
) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionHashNumbers>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
@@ -154,7 +155,7 @@ impl<'a> EitherWriter<'a, (), ()> {
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
return Ok(EitherWriter::RocksDB(_rocksdb_tx));
return Ok(EitherWriter::RocksDB(_rocksdb_batch));
}
Ok(EitherWriter::Database(
@@ -165,7 +166,7 @@ impl<'a> EitherWriter<'a, (), ()> {
/// Creates a new [`EitherWriter`] for account history based on storage settings.
pub fn new_accounts_history<P>(
provider: &P,
_rocksdb_tx: RocksTxArg<'a>,
_rocksdb_batch: RocksBatchArg<'a>,
) -> ProviderResult<EitherWriterTy<'a, P, tables::AccountsHistory>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
@@ -173,7 +174,7 @@ impl<'a> EitherWriter<'a, (), ()> {
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().account_history_in_rocksdb {
return Ok(EitherWriter::RocksDB(_rocksdb_tx));
return Ok(EitherWriter::RocksDB(_rocksdb_batch));
}
Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::AccountsHistory>()?))
@@ -207,18 +208,6 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}
/// Commits the `RocksDB` transaction if this is a `RocksDB` writer.
///
/// For [`Self::Database`] and [`Self::StaticFile`], this is a no-op as they use
/// different commit patterns (MDBX transaction commit, static file sync).
#[cfg(all(unix, feature = "rocksdb"))]
pub fn commit(self) -> ProviderResult<()> {
match self {
Self::Database(_) | Self::StaticFile(_) => Ok(()),
Self::RocksDB(tx) => tx.commit(),
}
}
}
impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>

View File

@@ -35,7 +35,7 @@ pub use consistent::ConsistentProvider;
#[cfg_attr(not(all(unix, feature = "rocksdb")), path = "rocksdb_stub.rs")]
pub(crate) mod rocksdb;
pub use rocksdb::{RocksDBBuilder, RocksDBProvider, RocksTx};
pub use rocksdb::{RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksTx};
/// Helper trait to bound [`NodeTypes`] so that combined with database they satisfy
/// [`ProviderNodeTypes`].

View File

@@ -2,4 +2,4 @@
mod metrics;
mod provider;
pub use provider::{RocksDBBuilder, RocksDBProvider, RocksTx};
pub use provider::{RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksTx};

View File

@@ -34,6 +34,11 @@ const DEFAULT_BYTES_PER_SYNC: u64 = 1_048_576;
/// Default bloom filter bits per key (~1% false positive rate).
const DEFAULT_BLOOM_FILTER_BITS: f64 = 10.0;
/// Default buffer capacity for compression in batches.
/// 4 KiB matches common block/page sizes and comfortably holds typical history values,
/// reducing the first few reallocations without over-allocating.
const DEFAULT_COMPRESS_BUF_CAPACITY: usize = 4096;
/// Builder for [`RocksDBProvider`].
pub struct RocksDBBuilder {
path: PathBuf,
@@ -261,6 +266,18 @@ impl RocksDBProvider {
RocksTx { inner, provider: self }
}
/// Creates a new batch for atomic writes.
///
/// Use [`Self::write_batch`] for closure-based atomic writes.
/// Use this method when the batch needs to be held by [`crate::EitherWriter`].
pub fn batch(&self) -> RocksDBBatch<'_> {
RocksDBBatch {
provider: self,
inner: WriteBatchWithTransaction::<true>::default(),
buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
}
}
/// Gets the column family handle for a table.
fn get_cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
self.0
@@ -356,29 +373,21 @@ impl RocksDBProvider {
where
F: FnOnce(&mut RocksDBBatch<'_>) -> ProviderResult<()>,
{
// Note: Using "Batch" as table name for batch operations across multiple tables
self.execute_with_operation_metric(RocksDBOperation::BatchWrite, "Batch", |this| {
let mut batch_handle = RocksDBBatch {
provider: this,
inner: WriteBatchWithTransaction::<true>::default(),
buf: Vec::new(),
};
let mut batch_handle = this.batch();
f(&mut batch_handle)?;
this.0.db.write(batch_handle.inner).map_err(|e| {
ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})
batch_handle.commit()
})
}
}
/// Handle for building a batch of operations atomically.
///
/// Uses `WriteBatchWithTransaction<true>` for compatibility with `TransactionDB`.
/// Uses `WriteBatchWithTransaction` for atomic writes without full transaction overhead.
/// Unlike [`RocksTx`], this does NOT support read-your-writes. Use for write-only flows
/// where you don't need to read back uncommitted data within the same operation
/// (e.g., history index writes).
#[must_use = "batch must be committed"]
pub struct RocksDBBatch<'a> {
provider: &'a RocksDBProvider,
inner: WriteBatchWithTransaction<true>,
@@ -422,6 +431,28 @@ impl<'a> RocksDBBatch<'a> {
self.inner.delete_cf(self.provider.get_cf_handle::<T>()?, key.encode().as_ref());
Ok(())
}
/// Commits the batch to the database.
///
/// This consumes the batch and writes all operations atomically to `RocksDB`.
pub fn commit(self) -> ProviderResult<()> {
self.provider.0.db.write_opt(self.inner, &WriteOptions::default()).map_err(|e| {
ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})
}
/// Returns the number of write operations (puts + deletes) queued in this batch.
pub fn len(&self) -> usize {
self.inner.len()
}
/// Returns `true` if the batch contains no operations.
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}
/// `RocksDB` transaction wrapper providing MDBX-like semantics.
@@ -839,4 +870,36 @@ mod tests {
// Commit
tx.commit().unwrap();
}
#[test]
fn test_batch_manual_commit() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// Create a batch via provider.batch()
let mut batch = provider.batch();
// Add entries
for i in 0..10u64 {
let value = format!("batch_value_{i}").into_bytes();
batch.put::<TestTable>(i, &value).unwrap();
}
// Verify len/is_empty
assert_eq!(batch.len(), 10);
assert!(!batch.is_empty());
// Data should NOT be visible before commit
assert_eq!(provider.get::<TestTable>(0).unwrap(), None);
// Commit the batch
batch.commit().unwrap();
// Now data should be visible
for i in 0..10u64 {
let value = format!("batch_value_{i}").into_bytes();
assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
}
}
}