From 5e461db751a5dc302e10c7be388dff702f1999c8 Mon Sep 17 00:00:00 2001 From: yongkangc Date: Tue, 30 Dec 2025 12:30:28 +0000 Subject: [PATCH] remove mut - Changed RocksDB batch handling to own the batch instead of borrowing it, allowing for safer extraction via `into_raw_rocksdb_batch()`. - Updated the `EitherWriter` to ensure proper registration of the RocksDB batch with the provider. - Simplified the transaction hash number writing process by removing unnecessary mutable borrows. - Enhanced comments for clarity on batch extraction and usage. --- crates/stages/stages/src/stages/tx_lookup.rs | 28 ++-- crates/storage/provider/src/either_writer.rs | 134 +++++++++++------- .../src/providers/rocksdb/provider.rs | 48 +++---- .../provider/src/providers/rocksdb_stub.rs | 3 + 4 files changed, 122 insertions(+), 91 deletions(-) diff --git a/crates/stages/stages/src/stages/tx_lookup.rs b/crates/stages/stages/src/stages/tx_lookup.rs index 5b55b846ee..087a040f79 100644 --- a/crates/stages/stages/src/stages/tx_lookup.rs +++ b/crates/stages/stages/src/stages/tx_lookup.rs @@ -162,13 +162,13 @@ where #[cfg(all(unix, feature = "rocksdb"))] let rocksdb = provider.rocksdb_provider(); #[cfg(all(unix, feature = "rocksdb"))] - let mut rocksdb_batch = rocksdb.batch(); + let rocksdb_batch = rocksdb.batch(); #[cfg(not(all(unix, feature = "rocksdb")))] - let mut rocksdb_batch = (); + let rocksdb_batch = (); // Create writer that routes to either MDBX or RocksDB based on settings let mut writer = - EitherWriter::new_transaction_hash_numbers(provider, &mut rocksdb_batch)?; + EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?; for (index, hash_to_number) in hash_collector.iter()?.enumerate() { let (hash_bytes, number_bytes) = hash_to_number?; @@ -186,13 +186,11 @@ where let tx_num = TxNumber::decompress(&number_bytes)?; writer.put_transaction_hash_number(hash, tx_num, append_only)?; } - // Drop writer to release mutable borrow - drop(writer); - // Register RocksDB batch for commit at provider level + // Extract and register RocksDB batch for commit at provider level #[cfg(all(unix, feature = "rocksdb"))] - if !rocksdb_batch.is_empty() { - provider.set_pending_rocksdb_batch(rocksdb_batch.into_inner()); + if let Some(batch) = writer.into_raw_rocksdb_batch() { + provider.set_pending_rocksdb_batch(batch); } trace!(target: "sync::stages::transaction_lookup", @@ -223,12 +221,12 @@ where #[cfg(all(unix, feature = "rocksdb"))] let rocksdb = provider.rocksdb_provider(); #[cfg(all(unix, feature = "rocksdb"))] - let mut rocksdb_batch = rocksdb.batch(); + let rocksdb_batch = rocksdb.batch(); #[cfg(not(all(unix, feature = "rocksdb")))] - let mut rocksdb_batch = (); + let rocksdb_batch = (); // Create writer that routes to either MDBX or RocksDB based on settings - let mut writer = EitherWriter::new_transaction_hash_numbers(provider, &mut rocksdb_batch)?; + let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?; let static_file_provider = provider.static_file_provider(); let rev_walker = provider @@ -249,13 +247,11 @@ where } } } - // Drop writer to release mutable borrow - drop(writer); - // Register RocksDB batch for commit at provider level + // Extract and register RocksDB batch for commit at provider level #[cfg(all(unix, feature = "rocksdb"))] - if !rocksdb_batch.is_empty() { - provider.set_pending_rocksdb_batch(rocksdb_batch.into_inner()); + if let Some(batch) = writer.into_raw_rocksdb_batch() { + provider.set_pending_rocksdb_batch(batch); } Ok(UnwindOutput { diff --git a/crates/storage/provider/src/either_writer.rs b/crates/storage/provider/src/either_writer.rs index 6e75bba38d..fed3647a75 100644 --- a/crates/storage/provider/src/either_writer.rs +++ b/crates/storage/provider/src/either_writer.rs @@ -43,12 +43,11 @@ type EitherWriterTy<'a, P, T> = EitherWriter< // Helper types so constructors stay exported even when RocksDB feature is off. // Historical data tables use a write-only RocksDB batch (no read-your-writes needed). -// The batch is borrowed so the caller retains ownership and can access it after the writer -// is dropped (e.g., for registering with the provider). +// The batch is owned by the writer and extracted via `into_raw_rocksdb_batch()` after use. #[cfg(all(unix, feature = "rocksdb"))] -type RocksBatchArg<'a> = &'a mut crate::providers::rocksdb::RocksDBBatch; +type RocksBatchArg = crate::providers::rocksdb::RocksDBBatch; #[cfg(not(all(unix, feature = "rocksdb")))] -type RocksBatchArg<'a> = &'a mut (); +type RocksBatchArg = (); #[cfg(all(unix, feature = "rocksdb"))] type RocksTxRefArg<'a> = &'a crate::providers::rocksdb::RocksTx<'a>; @@ -63,9 +62,10 @@ pub enum EitherWriter<'a, CURSOR, N> { /// Write to static file StaticFile(StaticFileProviderRWRefMut<'a, N>), /// Write to `RocksDB` using a write-only batch (historical tables). - /// The batch is borrowed and committed at provider commit time. + /// The batch is owned and extracted via `into_raw_rocksdb_batch()` after use. + /// Wrapped in `Option` to allow `take()` in extraction while supporting Drop. #[cfg(all(unix, feature = "rocksdb"))] - RocksDB(&'a mut RocksDBBatch), + RocksDB(Option), } impl<'a> EitherWriter<'a, (), ()> { @@ -138,7 +138,7 @@ impl<'a> EitherWriter<'a, (), ()> { /// Creates a new [`EitherWriter`] for storages history based on storage settings. pub fn new_storages_history

( provider: &P, - _rocksdb_batch: RocksBatchArg<'a>, + _rocksdb_batch: RocksBatchArg, ) -> ProviderResult> where P: DBProvider + NodePrimitivesProvider + StorageSettingsCache, @@ -146,7 +146,7 @@ impl<'a> EitherWriter<'a, (), ()> { { #[cfg(all(unix, feature = "rocksdb"))] if provider.cached_storage_settings().storages_history_in_rocksdb { - return Ok(EitherWriter::RocksDB(_rocksdb_batch)); + return Ok(EitherWriter::RocksDB(Some(_rocksdb_batch))); } Ok(EitherWriter::Database(provider.tx_ref().cursor_write::()?)) @@ -155,7 +155,7 @@ impl<'a> EitherWriter<'a, (), ()> { /// Creates a new [`EitherWriter`] for transaction hash numbers based on storage settings. pub fn new_transaction_hash_numbers

( provider: &P, - _rocksdb_batch: RocksBatchArg<'a>, + _rocksdb_batch: RocksBatchArg, ) -> ProviderResult> where P: DBProvider + NodePrimitivesProvider + StorageSettingsCache, @@ -163,7 +163,7 @@ impl<'a> EitherWriter<'a, (), ()> { { #[cfg(all(unix, feature = "rocksdb"))] if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb { - return Ok(EitherWriter::RocksDB(_rocksdb_batch)); + return Ok(EitherWriter::RocksDB(Some(_rocksdb_batch))); } Ok(EitherWriter::Database( @@ -174,7 +174,7 @@ impl<'a> EitherWriter<'a, (), ()> { /// Creates a new [`EitherWriter`] for account history based on storage settings. pub fn new_accounts_history

( provider: &P, - _rocksdb_batch: RocksBatchArg<'a>, + _rocksdb_batch: RocksBatchArg, ) -> ProviderResult> where P: DBProvider + NodePrimitivesProvider + StorageSettingsCache, @@ -182,7 +182,7 @@ impl<'a> EitherWriter<'a, (), ()> { { #[cfg(all(unix, feature = "rocksdb"))] if provider.cached_storage_settings().account_history_in_rocksdb { - return Ok(EitherWriter::RocksDB(_rocksdb_batch)); + return Ok(EitherWriter::RocksDB(Some(_rocksdb_batch))); } Ok(EitherWriter::Database(provider.tx_ref().cursor_write::()?)) @@ -216,6 +216,34 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> { Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider), } } + + /// Extracts the raw `RocksDB` write batch from this writer, if it contains one. + /// + /// Returns `Some(WriteBatchWithTransaction)` for [`Self::RocksDB`] variant, + /// `None` for other variants. + /// + /// This is used to defer `RocksDB` commits to the provider level, ensuring all + /// storage commits (MDBX, static files, `RocksDB`) happen atomically in a single place. + /// + /// # Important + /// + /// This method **must** be called for `RocksDB` writers to avoid data loss. + /// Dropping an `EitherWriter::RocksDB` without calling this method will panic + /// in debug builds and log an error in release builds. + #[cfg(all(unix, feature = "rocksdb"))] + #[must_use = "the returned batch must be registered with the provider via set_pending_rocksdb_batch()"] + pub fn into_raw_rocksdb_batch(mut self) -> Option> { + match &mut self { + Self::Database(_) | Self::StaticFile(_) => None, + Self::RocksDB(batch) => batch.take().map(|b| b.into_inner()), + } + } + + /// Stub version for non-RocksDB builds. + #[cfg(not(all(unix, feature = "rocksdb")))] + pub fn into_raw_rocksdb_batch(self) -> Option<()> { + None + } } impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> @@ -327,7 +355,10 @@ where } Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), #[cfg(all(unix, feature = "rocksdb"))] - Self::RocksDB(batch) => batch.put::(hash, &tx_num), + Self::RocksDB(batch) => batch + .as_mut() + .expect("batch taken") + .put::(hash, &tx_num), } } @@ -342,7 +373,9 @@ where } Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), #[cfg(all(unix, feature = "rocksdb"))] - Self::RocksDB(batch) => batch.delete::(hash), + Self::RocksDB(batch) => { + batch.as_mut().expect("batch taken").delete::(hash) + } } } } @@ -361,7 +394,9 @@ where Self::Database(cursor) => Ok(cursor.upsert(key, value)?), Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), #[cfg(all(unix, feature = "rocksdb"))] - Self::RocksDB(batch) => batch.put::(key, value), + Self::RocksDB(batch) => { + batch.as_mut().expect("batch taken").put::(key, value) + } } } @@ -376,7 +411,9 @@ where } Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), #[cfg(all(unix, feature = "rocksdb"))] - Self::RocksDB(batch) => batch.delete::(key), + Self::RocksDB(batch) => { + batch.as_mut().expect("batch taken").delete::(key) + } } } } @@ -395,7 +432,9 @@ where Self::Database(cursor) => Ok(cursor.upsert(key, value)?), Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), #[cfg(all(unix, feature = "rocksdb"))] - Self::RocksDB(batch) => batch.put::(key, value), + Self::RocksDB(batch) => { + batch.as_mut().expect("batch taken").put::(key, value) + } } } @@ -410,7 +449,9 @@ where } Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), #[cfg(all(unix, feature = "rocksdb"))] - Self::RocksDB(batch) => batch.delete::(key), + Self::RocksDB(batch) => { + batch.as_mut().expect("batch taken").delete::(key) + } } } } @@ -721,25 +762,22 @@ mod rocksdb_tests { // Get the RocksDB batch from the provider let rocksdb = factory.rocksdb_provider(); - let mut batch = rocksdb.batch(); + let batch = rocksdb.batch(); // Create EitherWriter with RocksDB let provider = factory.database_provider_rw().unwrap(); - { - let mut writer = - EitherWriter::new_transaction_hash_numbers(&provider, &mut batch).unwrap(); + let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap(); - // Verify we got a RocksDB writer - assert!(matches!(writer, EitherWriter::RocksDB(_))); + // Verify we got a RocksDB writer + assert!(matches!(writer, EitherWriter::RocksDB(_))); - // Write transaction hash numbers (append_only=false since we're using RocksDB) - writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap(); - writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap(); - } + // Write transaction hash numbers (append_only=false since we're using RocksDB) + writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap(); + writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap(); - // Register the batch with provider for commit - if !batch.is_empty() { - provider.set_pending_rocksdb_batch(batch.into_inner()); + // Extract and register the batch with provider for commit + if let Some(batch) = writer.into_raw_rocksdb_batch() { + provider.set_pending_rocksdb_batch(batch); } // Commit via provider - this commits RocksDB batch too @@ -770,17 +808,14 @@ mod rocksdb_tests { assert_eq!(rocksdb.get::(hash).unwrap(), Some(tx_num)); // Now delete using EitherWriter - let mut batch = rocksdb.batch(); + let batch = rocksdb.batch(); let provider = factory.database_provider_rw().unwrap(); - { - let mut writer = - EitherWriter::new_transaction_hash_numbers(&provider, &mut batch).unwrap(); - writer.delete_transaction_hash_number(hash).unwrap(); - } + let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap(); + writer.delete_transaction_hash_number(hash).unwrap(); - // Register the batch and commit via provider - if !batch.is_empty() { - provider.set_pending_rocksdb_batch(batch.into_inner()); + // Extract and register the batch, then commit via provider + if let Some(batch) = writer.into_raw_rocksdb_batch() { + provider.set_pending_rocksdb_batch(batch); } provider.commit().unwrap(); @@ -944,22 +979,19 @@ mod rocksdb_tests { // Get the RocksDB batch from the provider let rocksdb = factory.rocksdb_provider(); - let mut batch = rocksdb.batch(); + let batch = rocksdb.batch(); // Create provider and EitherWriter let provider = factory.database_provider_rw().unwrap(); - { - let mut writer = - EitherWriter::new_transaction_hash_numbers(&provider, &mut batch).unwrap(); + let mut writer = EitherWriter::new_transaction_hash_numbers(&provider, batch).unwrap(); - // Write transaction hash numbers (append_only=false since we're using RocksDB) - writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap(); - writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap(); - } + // Write transaction hash numbers (append_only=false since we're using RocksDB) + writer.put_transaction_hash_number(hash1, tx_num1, false).unwrap(); + writer.put_transaction_hash_number(hash2, tx_num2, false).unwrap(); - // Register the batch with the provider - if !batch.is_empty() { - provider.set_pending_rocksdb_batch(batch.into_inner()); + // Extract and register the batch with the provider + if let Some(batch) = writer.into_raw_rocksdb_batch() { + provider.set_pending_rocksdb_batch(batch); } // Data should NOT be visible yet (batch not committed) diff --git a/crates/storage/provider/src/providers/rocksdb/provider.rs b/crates/storage/provider/src/providers/rocksdb/provider.rs index b1a5a52b7f..0752fd19a9 100644 --- a/crates/storage/provider/src/providers/rocksdb/provider.rs +++ b/crates/storage/provider/src/providers/rocksdb/provider.rs @@ -653,30 +653,6 @@ impl<'db> RocksTx<'db> { Ok(RocksTxIter { inner: iter, _marker: std::marker::PhantomData }) } - /// Creates a raw iterator for bidirectional traversal of the specified table. - /// - /// Unlike [`Self::iter_from`], raw iterators support `prev()` for backward navigation. - /// Use this for history lookups where checking the previous entry is needed. - pub fn raw_iterator_cf( - &self, - ) -> ProviderResult>> { - let cf = self.provider.get_cf_handle::()?; - Ok(self.inner.raw_iterator_cf(cf)) - } - - /// Raw iterators surface I/O errors via `status()`, not through `key()`/`value()`. - /// Call this after `seek`/`next`/`prev` before interpreting `valid()`. - fn raw_iter_status_ok( - iter: &DBRawIteratorWithThreadMode<'_, Transaction<'_, TransactionDB>>, - ) -> ProviderResult<()> { - iter.status().map_err(|e| { - ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo { - message: e.to_string().into(), - code: -1, - })) - }) - } - /// Commits the transaction, persisting all changes. pub fn commit(self) -> ProviderResult<()> { self.inner.commit().map_err(|e| { @@ -810,6 +786,30 @@ impl<'db> RocksTx<'db> { lowest_available_block_number, )) } + + /// Creates a raw iterator for bidirectional traversal of the specified table. + /// + /// Unlike [`Self::iter_from`], raw iterators support `prev()` for backward navigation. + /// Use this for history lookups where checking the previous entry is needed. + fn raw_iterator_cf( + &self, + ) -> ProviderResult>> { + let cf = self.provider.get_cf_handle::()?; + Ok(self.inner.raw_iterator_cf(cf)) + } + + /// Raw iterators surface I/O errors via `status()`, not through `key()`/`value()`. + /// Call this after `seek`/`next`/`prev` before interpreting `valid()`. + fn raw_iter_status_ok( + iter: &DBRawIteratorWithThreadMode<'_, Transaction<'_, TransactionDB>>, + ) -> ProviderResult<()> { + iter.status().map_err(|e| { + ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo { + message: e.to_string().into(), + code: -1, + })) + }) + } } /// Iterator over a `RocksDB` table (non-transactional). diff --git a/crates/storage/provider/src/providers/rocksdb_stub.rs b/crates/storage/provider/src/providers/rocksdb_stub.rs index fe690565d4..e9fa2ac1b5 100644 --- a/crates/storage/provider/src/providers/rocksdb_stub.rs +++ b/crates/storage/provider/src/providers/rocksdb_stub.rs @@ -122,6 +122,9 @@ impl RocksDBBatch { pub const fn is_empty(&self) -> bool { true } + + /// Consumes the batch and returns the inner batch (stub returns nothing). + pub fn into_inner(self) {} } /// A stub builder for `RocksDB` on non-Unix platforms.