mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-09 07:17:56 -05:00
remove mut
- Changed RocksDB batch handling to own the batch instead of borrowing it, allowing for safer extraction via `into_raw_rocksdb_batch()`. - Updated the `EitherWriter` to ensure proper registration of the RocksDB batch with the provider. - Simplified the transaction hash number writing process by removing unnecessary mutable borrows. - Enhanced comments for clarity on batch extraction and usage.
This commit is contained in:
@@ -162,13 +162,13 @@ where
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb = provider.rocksdb_provider();
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let mut rocksdb_batch = rocksdb.batch();
|
||||
let rocksdb_batch = rocksdb.batch();
|
||||
#[cfg(not(all(unix, feature = "rocksdb")))]
|
||||
let mut rocksdb_batch = ();
|
||||
let rocksdb_batch = ();
|
||||
|
||||
// Create writer that routes to either MDBX or RocksDB based on settings
|
||||
let mut writer =
|
||||
EitherWriter::new_transaction_hash_numbers(provider, &mut rocksdb_batch)?;
|
||||
EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
|
||||
|
||||
for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
|
||||
let (hash_bytes, number_bytes) = hash_to_number?;
|
||||
@@ -186,13 +186,11 @@ where
|
||||
let tx_num = TxNumber::decompress(&number_bytes)?;
|
||||
writer.put_transaction_hash_number(hash, tx_num, append_only)?;
|
||||
}
|
||||
// Drop writer to release mutable borrow
|
||||
drop(writer);
|
||||
|
||||
// Register RocksDB batch for commit at provider level
|
||||
// Extract and register RocksDB batch for commit at provider level
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if !rocksdb_batch.is_empty() {
|
||||
provider.set_pending_rocksdb_batch(rocksdb_batch.into_inner());
|
||||
if let Some(batch) = writer.into_raw_rocksdb_batch() {
|
||||
provider.set_pending_rocksdb_batch(batch);
|
||||
}
|
||||
|
||||
trace!(target: "sync::stages::transaction_lookup",
|
||||
@@ -223,12 +221,12 @@ where
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb = provider.rocksdb_provider();
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let mut rocksdb_batch = rocksdb.batch();
|
||||
let rocksdb_batch = rocksdb.batch();
|
||||
#[cfg(not(all(unix, feature = "rocksdb")))]
|
||||
let mut rocksdb_batch = ();
|
||||
let rocksdb_batch = ();
|
||||
|
||||
// Create writer that routes to either MDBX or RocksDB based on settings
|
||||
let mut writer = EitherWriter::new_transaction_hash_numbers(provider, &mut rocksdb_batch)?;
|
||||
let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
|
||||
|
||||
let static_file_provider = provider.static_file_provider();
|
||||
let rev_walker = provider
|
||||
@@ -249,13 +247,11 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
// Drop writer to release mutable borrow
|
||||
drop(writer);
|
||||
|
||||
// Register RocksDB batch for commit at provider level
|
||||
// Extract and register RocksDB batch for commit at provider level
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if !rocksdb_batch.is_empty() {
|
||||
provider.set_pending_rocksdb_batch(rocksdb_batch.into_inner());
|
||||
if let Some(batch) = writer.into_raw_rocksdb_batch() {
|
||||
provider.set_pending_rocksdb_batch(batch);
|
||||
}
|
||||
|
||||
Ok(UnwindOutput {
|
||||
|
||||
@@ -43,12 +43,11 @@ type EitherWriterTy<'a, P, T> = EitherWriter<
|
||||
|
||||
// Helper types so constructors stay exported even when RocksDB feature is off.
|
||||
// Historical data tables use a write-only RocksDB batch (no read-your-writes needed).
|
||||
// The batch is borrowed so the caller retains ownership and can access it after the writer
|
||||
// is dropped (e.g., for registering with the provider).
|
||||
// The batch is owned by the writer and extracted via `into_raw_rocksdb_batch()` after use.
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
type RocksBatchArg<'a> = &'a mut crate::providers::rocksdb::RocksDBBatch;
|
||||
type RocksBatchArg = crate::providers::rocksdb::RocksDBBatch;
|
||||
#[cfg(not(all(unix, feature = "rocksdb")))]
|
||||
type RocksBatchArg<'a> = &'a mut ();
|
||||
type RocksBatchArg = ();
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
type RocksTxRefArg<'a> = &'a crate::providers::rocksdb::RocksTx<'a>;
|
||||
@@ -63,9 +62,10 @@ pub enum EitherWriter<'a, CURSOR, N> {
|
||||
/// Write to static file
|
||||
StaticFile(StaticFileProviderRWRefMut<'a, N>),
|
||||
/// Write to `RocksDB` using a write-only batch (historical tables).
|
||||
/// The batch is borrowed and committed at provider commit time.
|
||||
/// The batch is owned and extracted via `into_raw_rocksdb_batch()` after use.
|
||||
/// Wrapped in `Option` to allow `take()` in extraction while supporting Drop.
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
RocksDB(&'a mut RocksDBBatch),
|
||||
RocksDB(Option<RocksDBBatch>),
|
||||
}
|
||||
|
||||
impl<'a> EitherWriter<'a, (), ()> {
|
||||
@@ -138,7 +138,7 @@ impl<'a> EitherWriter<'a, (), ()> {
|
||||
/// Creates a new [`EitherWriter`] for storages history based on storage settings.
|
||||
pub fn new_storages_history<P>(
|
||||
provider: &P,
|
||||
_rocksdb_batch: RocksBatchArg<'a>,
|
||||
_rocksdb_batch: RocksBatchArg,
|
||||
) -> ProviderResult<EitherWriterTy<'a, P, tables::StoragesHistory>>
|
||||
where
|
||||
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
|
||||
@@ -146,7 +146,7 @@ impl<'a> EitherWriter<'a, (), ()> {
|
||||
{
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if provider.cached_storage_settings().storages_history_in_rocksdb {
|
||||
return Ok(EitherWriter::RocksDB(_rocksdb_batch));
|
||||
return Ok(EitherWriter::RocksDB(Some(_rocksdb_batch)));
|
||||
}
|
||||
|
||||
Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::StoragesHistory>()?))
|
||||
@@ -155,7 +155,7 @@ impl<'a> EitherWriter<'a, (), ()> {
|
||||
/// Creates a new [`EitherWriter`] for transaction hash numbers based on storage settings.
|
||||
pub fn new_transaction_hash_numbers<P>(
|
||||
provider: &P,
|
||||
_rocksdb_batch: RocksBatchArg<'a>,
|
||||
_rocksdb_batch: RocksBatchArg,
|
||||
) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionHashNumbers>>
|
||||
where
|
||||
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
|
||||
@@ -163,7 +163,7 @@ impl<'a> EitherWriter<'a, (), ()> {
|
||||
{
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
|
||||
return Ok(EitherWriter::RocksDB(_rocksdb_batch));
|
||||
return Ok(EitherWriter::RocksDB(Some(_rocksdb_batch)));
|
||||
}
|
||||
|
||||
Ok(EitherWriter::Database(
|
||||
@@ -174,7 +174,7 @@ impl<'a> EitherWriter<'a, (), ()> {
|
||||
/// Creates a new [`EitherWriter`] for account history based on storage settings.
|
||||
pub fn new_accounts_history<P>(
|
||||
provider: &P,
|
||||
_rocksdb_batch: RocksBatchArg<'a>,
|
||||
_rocksdb_batch: RocksBatchArg,
|
||||
) -> ProviderResult<EitherWriterTy<'a, P, tables::AccountsHistory>>
|
||||
where
|
||||
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
|
||||
@@ -182,7 +182,7 @@ impl<'a> EitherWriter<'a, (), ()> {
|
||||
{
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if provider.cached_storage_settings().account_history_in_rocksdb {
|
||||
return Ok(EitherWriter::RocksDB(_rocksdb_batch));
|
||||
return Ok(EitherWriter::RocksDB(Some(_rocksdb_batch)));
|
||||
}
|
||||
|
||||
Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::AccountsHistory>()?))
|
||||
@@ -216,6 +216,34 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
|
||||
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
|
||||
}
|
||||
}
|
||||
|
||||
/// Extracts the raw `RocksDB` write batch from this writer, if it contains one.
|
||||
///
|
||||
/// Returns `Some(WriteBatchWithTransaction)` for [`Self::RocksDB`] variant,
|
||||
/// `None` for other variants.
|
||||
///
|
||||
/// This is used to defer `RocksDB` commits to the provider level, ensuring all
|
||||
/// storage commits (MDBX, static files, `RocksDB`) happen atomically in a single place.
|
||||
///
|
||||
/// # Important
|
||||
///
|
||||
/// This method **must** be called for `RocksDB` writers to avoid data loss.
|
||||
/// Dropping an `EitherWriter::RocksDB` without calling this method will panic
|
||||
/// in debug builds and log an error in release builds.
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
#[must_use = "the returned batch must be registered with the provider via set_pending_rocksdb_batch()"]
|
||||
pub fn into_raw_rocksdb_batch(mut self) -> Option<rocksdb::WriteBatchWithTransaction<true>> {
|
||||
match &mut self {
|
||||
Self::Database(_) | Self::StaticFile(_) => None,
|
||||
Self::RocksDB(batch) => batch.take().map(|b| b.into_inner()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Stub version for non-RocksDB builds.
|
||||
#[cfg(not(all(unix, feature = "rocksdb")))]
|
||||
pub fn into_raw_rocksdb_batch(self) -> Option<()> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
|
||||
@@ -327,7 +355,10 @@ where
|
||||
}
|
||||
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(batch) => batch.put::<tables::TransactionHashNumbers>(hash, &tx_num),
|
||||
Self::RocksDB(batch) => batch
|
||||
.as_mut()
|
||||
.expect("batch taken")
|
||||
.put::<tables::TransactionHashNumbers>(hash, &tx_num),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -342,7 +373,9 @@ where
|
||||
}
|
||||
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(batch) => batch.delete::<tables::TransactionHashNumbers>(hash),
|
||||
Self::RocksDB(batch) => {
|
||||
batch.as_mut().expect("batch taken").delete::<tables::TransactionHashNumbers>(hash)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -361,7 +394,9 @@ where
|
||||
Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
|
||||
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
|
||||
Self::RocksDB(batch) => {
|
||||
batch.as_mut().expect("batch taken").put::<tables::StoragesHistory>(key, value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -376,7 +411,9 @@ where
|
||||
}
|
||||
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(batch) => batch.delete::<tables::StoragesHistory>(key),
|
||||
Self::RocksDB(batch) => {
|
||||
batch.as_mut().expect("batch taken").delete::<tables::StoragesHistory>(key)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -395,7 +432,9 @@ where
|
||||
Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
|
||||
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(batch) => batch.put::<tables::AccountsHistory>(key, value),
|
||||
Self::RocksDB(batch) => {
|
||||
batch.as_mut().expect("batch taken").put::<tables::AccountsHistory>(key, value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -410,7 +449,9 @@ where
|
||||
}
|
||||
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
Self::RocksDB(batch) => batch.delete::<tables::AccountsHistory>(key),
|
||||
Self::RocksDB(batch) => {
|
||||
batch.as_mut().expect("batch taken").delete::<tables::AccountsHistory>(key)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -721,25 +762,22 @@ mod rocksdb_tests {
|
||||
|
||||
// Get the RocksDB batch from the provider
|
||||
let rocksdb = factory.rocksdb_provider();
|
||||
let mut batch = rocksdb.batch();
|
||||
let batch = rocksdb.batch();
|
||||
|
||||
// Create EitherWriter with RocksDB
|
||||
let provider = factory.database_provider_rw().unwrap();
|
||||
{
|
||||
let mut writer =
|
||||
EitherWriter::new_transaction_hash_numbers(&provider, &mut batch).unwrap();
|
||||
let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
|
||||
|
||||
// Verify we got a RocksDB writer
|
||||
assert!(matches!(writer, EitherWriter::RocksDB(_)));
|
||||
// Verify we got a RocksDB writer
|
||||
assert!(matches!(writer, EitherWriter::RocksDB(_)));
|
||||
|
||||
// Write transaction hash numbers (append_only=false since we're using RocksDB)
|
||||
writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
|
||||
writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
|
||||
}
|
||||
// Write transaction hash numbers (append_only=false since we're using RocksDB)
|
||||
writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
|
||||
writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
|
||||
|
||||
// Register the batch with provider for commit
|
||||
if !batch.is_empty() {
|
||||
provider.set_pending_rocksdb_batch(batch.into_inner());
|
||||
// Extract and register the batch with provider for commit
|
||||
if let Some(batch) = writer.into_raw_rocksdb_batch() {
|
||||
provider.set_pending_rocksdb_batch(batch);
|
||||
}
|
||||
|
||||
// Commit via provider - this commits RocksDB batch too
|
||||
@@ -770,17 +808,14 @@ mod rocksdb_tests {
|
||||
assert_eq!(rocksdb.get::<tables::TransactionHashNumbers>(hash).unwrap(), Some(tx_num));
|
||||
|
||||
// Now delete using EitherWriter
|
||||
let mut batch = rocksdb.batch();
|
||||
let batch = rocksdb.batch();
|
||||
let provider = factory.database_provider_rw().unwrap();
|
||||
{
|
||||
let mut writer =
|
||||
EitherWriter::new_transaction_hash_numbers(&provider, &mut batch).unwrap();
|
||||
writer.delete_transaction_hash_number(hash).unwrap();
|
||||
}
|
||||
let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
|
||||
writer.delete_transaction_hash_number(hash).unwrap();
|
||||
|
||||
// Register the batch and commit via provider
|
||||
if !batch.is_empty() {
|
||||
provider.set_pending_rocksdb_batch(batch.into_inner());
|
||||
// Extract and register the batch, then commit via provider
|
||||
if let Some(batch) = writer.into_raw_rocksdb_batch() {
|
||||
provider.set_pending_rocksdb_batch(batch);
|
||||
}
|
||||
provider.commit().unwrap();
|
||||
|
||||
@@ -944,22 +979,19 @@ mod rocksdb_tests {
|
||||
|
||||
// Get the RocksDB batch from the provider
|
||||
let rocksdb = factory.rocksdb_provider();
|
||||
let mut batch = rocksdb.batch();
|
||||
let batch = rocksdb.batch();
|
||||
|
||||
// Create provider and EitherWriter
|
||||
let provider = factory.database_provider_rw().unwrap();
|
||||
{
|
||||
let mut writer =
|
||||
EitherWriter::new_transaction_hash_numbers(&provider, &mut batch).unwrap();
|
||||
let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap();
|
||||
|
||||
// Write transaction hash numbers (append_only=false since we're using RocksDB)
|
||||
writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
|
||||
writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
|
||||
}
|
||||
// Write transaction hash numbers (append_only=false since we're using RocksDB)
|
||||
writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap();
|
||||
writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap();
|
||||
|
||||
// Register the batch with the provider
|
||||
if !batch.is_empty() {
|
||||
provider.set_pending_rocksdb_batch(batch.into_inner());
|
||||
// Extract and register the batch with the provider
|
||||
if let Some(batch) = writer.into_raw_rocksdb_batch() {
|
||||
provider.set_pending_rocksdb_batch(batch);
|
||||
}
|
||||
|
||||
// Data should NOT be visible yet (batch not committed)
|
||||
|
||||
@@ -653,30 +653,6 @@ impl<'db> RocksTx<'db> {
|
||||
Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData })
|
||||
}
|
||||
|
||||
/// Creates a raw iterator for bidirectional traversal of the specified table.
|
||||
///
|
||||
/// Unlike [`Self::iter_from`], raw iterators support `prev()` for backward navigation.
|
||||
/// Use this for history lookups where checking the previous entry is needed.
|
||||
pub fn raw_iterator_cf<T: Table>(
|
||||
&self,
|
||||
) -> ProviderResult<DBRawIteratorWithThreadMode<'_, Transaction<'_, TransactionDB>>> {
|
||||
let cf = self.provider.get_cf_handle::<T>()?;
|
||||
Ok(self.inner.raw_iterator_cf(cf))
|
||||
}
|
||||
|
||||
/// Raw iterators surface I/O errors via `status()`, not through `key()`/`value()`.
|
||||
/// Call this after `seek`/`next`/`prev` before interpreting `valid()`.
|
||||
fn raw_iter_status_ok(
|
||||
iter: &DBRawIteratorWithThreadMode<'_, Transaction<'_, TransactionDB>>,
|
||||
) -> ProviderResult<()> {
|
||||
iter.status().map_err(|e| {
|
||||
ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
|
||||
message: e.to_string().into(),
|
||||
code: -1,
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
/// Commits the transaction, persisting all changes.
|
||||
pub fn commit(self) -> ProviderResult<()> {
|
||||
self.inner.commit().map_err(|e| {
|
||||
@@ -810,6 +786,30 @@ impl<'db> RocksTx<'db> {
|
||||
lowest_available_block_number,
|
||||
))
|
||||
}
|
||||
|
||||
/// Creates a raw iterator for bidirectional traversal of the specified table.
|
||||
///
|
||||
/// Unlike [`Self::iter_from`], raw iterators support `prev()` for backward navigation.
|
||||
/// Use this for history lookups where checking the previous entry is needed.
|
||||
fn raw_iterator_cf<T: Table>(
|
||||
&self,
|
||||
) -> ProviderResult<DBRawIteratorWithThreadMode<'_, Transaction<'_, TransactionDB>>> {
|
||||
let cf = self.provider.get_cf_handle::<T>()?;
|
||||
Ok(self.inner.raw_iterator_cf(cf))
|
||||
}
|
||||
|
||||
/// Raw iterators surface I/O errors via `status()`, not through `key()`/`value()`.
|
||||
/// Call this after `seek`/`next`/`prev` before interpreting `valid()`.
|
||||
fn raw_iter_status_ok(
|
||||
iter: &DBRawIteratorWithThreadMode<'_, Transaction<'_, TransactionDB>>,
|
||||
) -> ProviderResult<()> {
|
||||
iter.status().map_err(|e| {
|
||||
ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
|
||||
message: e.to_string().into(),
|
||||
code: -1,
|
||||
}))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterator over a `RocksDB` table (non-transactional).
|
||||
|
||||
@@ -122,6 +122,9 @@ impl RocksDBBatch {
|
||||
pub const fn is_empty(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
/// Consumes the batch and returns the inner batch (stub returns nothing).
|
||||
pub fn into_inner(self) {}
|
||||
}
|
||||
|
||||
/// A stub builder for `RocksDB` on non-Unix platforms.
|
||||
|
||||
Reference in New Issue
Block a user