diff --git a/crates/stages/stages/src/stages/index_account_history.rs b/crates/stages/stages/src/stages/index_account_history.rs index 92fa5f3244..5a3ba750d5 100644 --- a/crates/stages/stages/src/stages/index_account_history.rs +++ b/crates/stages/stages/src/stages/index_account_history.rs @@ -136,7 +136,7 @@ where info!(target: "sync::stages::index_account_history::exec", "Loading indices into database"); - provider.with_rocksdb_batch(|rocksdb_batch| { + provider.with_rocksdb_batch_auto_commit(|rocksdb_batch| { let mut writer = EitherWriter::new_accounts_history(provider, rocksdb_batch)?; load_account_history(collector, first_sync, &mut writer) .map_err(|e| reth_provider::ProviderError::other(Box::new(e)))?; diff --git a/crates/stages/stages/src/stages/index_storage_history.rs b/crates/stages/stages/src/stages/index_storage_history.rs index 36f2f6ede6..08192c8871 100644 --- a/crates/stages/stages/src/stages/index_storage_history.rs +++ b/crates/stages/stages/src/stages/index_storage_history.rs @@ -140,7 +140,7 @@ where info!(target: "sync::stages::index_storage_history::exec", "Loading indices into database"); - provider.with_rocksdb_batch(|rocksdb_batch| { + provider.with_rocksdb_batch_auto_commit(|rocksdb_batch| { let mut writer = EitherWriter::new_storages_history(provider, rocksdb_batch)?; load_storage_history(collector, first_sync, &mut writer) .map_err(|e| reth_provider::ProviderError::other(Box::new(e)))?; diff --git a/crates/stages/stages/src/stages/mod.rs b/crates/stages/stages/src/stages/mod.rs index 8249d74914..1145af1b93 100644 --- a/crates/stages/stages/src/stages/mod.rs +++ b/crates/stages/stages/src/stages/mod.rs @@ -1,5 +1,6 @@ /// The bodies stage. mod bodies; +mod era; /// The execution stage that generates state diff. mod execution; /// The finish stage @@ -36,9 +37,7 @@ pub use prune::*; pub use sender_recovery::*; pub use tx_lookup::*; -mod era; mod utils; - use utils::*; #[cfg(test)] diff --git a/crates/stages/stages/src/stages/tx_lookup.rs b/crates/stages/stages/src/stages/tx_lookup.rs index 087a040f79..bf056a655b 100644 --- a/crates/stages/stages/src/stages/tx_lookup.rs +++ b/crates/stages/stages/src/stages/tx_lookup.rs @@ -158,15 +158,13 @@ where let append_only = provider.count_entries::()?.is_zero(); - // Create RocksDB batch if feature is enabled + // Auto-commits on threshold; consistency check heals any crash. #[cfg(all(unix, feature = "rocksdb"))] let rocksdb = provider.rocksdb_provider(); #[cfg(all(unix, feature = "rocksdb"))] - let rocksdb_batch = rocksdb.batch(); + let rocksdb_batch = rocksdb.batch_with_auto_commit(); #[cfg(not(all(unix, feature = "rocksdb")))] let rocksdb_batch = (); - - // Create writer that routes to either MDBX or RocksDB based on settings let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?; @@ -217,15 +215,12 @@ where ) -> Result { let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size); - // Create RocksDB batch if feature is enabled #[cfg(all(unix, feature = "rocksdb"))] let rocksdb = provider.rocksdb_provider(); #[cfg(all(unix, feature = "rocksdb"))] let rocksdb_batch = rocksdb.batch(); #[cfg(not(all(unix, feature = "rocksdb")))] let rocksdb_batch = (); - - // Create writer that routes to either MDBX or RocksDB based on settings let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?; let static_file_provider = provider.static_file_provider(); diff --git a/crates/storage/provider/src/providers/rocksdb/provider.rs b/crates/storage/provider/src/providers/rocksdb/provider.rs index d9aba31e5a..75b9e6fa5d 100644 --- a/crates/storage/provider/src/providers/rocksdb/provider.rs +++ b/crates/storage/provider/src/providers/rocksdb/provider.rs @@ -98,6 +98,13 @@ const DEFAULT_BYTES_PER_SYNC: u64 = 1_048_576; /// reducing the first few reallocations without over-allocating. const DEFAULT_COMPRESS_BUF_CAPACITY: usize = 4096; +/// Default auto-commit threshold for batch writes (4 GiB). +/// +/// When a batch exceeds this size, it is automatically committed to prevent OOM +/// during large bulk writes. The consistency check on startup heals any crash +/// that occurs between auto-commits. +const DEFAULT_AUTO_COMMIT_THRESHOLD: usize = 4 * 1024 * 1024 * 1024; + /// Builder for [`RocksDBProvider`]. pub struct RocksDBBuilder { path: PathBuf, @@ -629,6 +636,21 @@ impl RocksDBProvider { provider: self, inner: WriteBatchWithTransaction::::default(), buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY), + auto_commit_threshold: None, + } + } + + /// Creates a new batch with auto-commit enabled. + /// + /// When the batch size exceeds the threshold (4 GiB), the batch is automatically + /// committed and reset. This prevents OOM during large bulk writes while maintaining + /// crash-safety via the consistency check on startup. + pub fn batch_with_auto_commit(&self) -> RocksDBBatch<'_> { + RocksDBBatch { + provider: self, + inner: WriteBatchWithTransaction::::default(), + buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY), + auto_commit_threshold: Some(DEFAULT_AUTO_COMMIT_THRESHOLD), } } @@ -1137,11 +1159,16 @@ impl RocksDBProvider { /// Unlike [`RocksTx`], this does NOT support read-your-writes. Use for write-only flows /// where you don't need to read back uncommitted data within the same operation /// (e.g., history index writes). +/// +/// When `auto_commit_threshold` is set, the batch will automatically commit and reset +/// when the batch size exceeds the threshold. This prevents OOM during large bulk writes. #[must_use = "batch must be committed"] pub struct RocksDBBatch<'a> { provider: &'a RocksDBProvider, inner: WriteBatchWithTransaction, buf: Vec, + /// If set, batch auto-commits when size exceeds this threshold (in bytes). + auto_commit_threshold: Option, } impl fmt::Debug for RocksDBBatch<'_> { @@ -1160,12 +1187,16 @@ impl fmt::Debug for RocksDBBatch<'_> { impl<'a> RocksDBBatch<'a> { /// Puts a value into the batch. + /// + /// If auto-commit is enabled and the batch exceeds the threshold, commits and resets. pub fn put(&mut self, key: T::Key, value: &T::Value) -> ProviderResult<()> { let encoded_key = key.encode(); self.put_encoded::(&encoded_key, value) } /// Puts a value into the batch using pre-encoded key. + /// + /// If auto-commit is enabled and the batch exceeds the threshold, commits and resets. pub fn put_encoded( &mut self, key: &::Encoded, @@ -1173,12 +1204,43 @@ impl<'a> RocksDBBatch<'a> { ) -> ProviderResult<()> { let value_bytes = compress_to_buf_or_ref!(self.buf, value).unwrap_or(&self.buf); self.inner.put_cf(self.provider.get_cf_handle::()?, key, value_bytes); + self.maybe_auto_commit()?; Ok(()) } /// Deletes a value from the batch. + /// + /// If auto-commit is enabled and the batch exceeds the threshold, commits and resets. pub fn delete(&mut self, key: T::Key) -> ProviderResult<()> { self.inner.delete_cf(self.provider.get_cf_handle::()?, key.encode().as_ref()); + self.maybe_auto_commit()?; + Ok(()) + } + + /// Commits and resets the batch if it exceeds the auto-commit threshold. + /// + /// This is called after each `put` or `delete` operation to prevent unbounded memory growth. + /// Returns immediately if auto-commit is disabled or threshold not reached. + fn maybe_auto_commit(&mut self) -> ProviderResult<()> { + if let Some(threshold) = self.auto_commit_threshold && + self.inner.size_in_bytes() >= threshold + { + tracing::debug!( + target: "providers::rocksdb", + batch_size = self.inner.size_in_bytes(), + threshold, + "Auto-committing RocksDB batch" + ); + let old_batch = std::mem::take(&mut self.inner); + self.provider.0.db_rw().write_opt(old_batch, &WriteOptions::default()).map_err( + |e| { + ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo { + message: e.to_string().into(), + code: -1, + })) + }, + )?; + } Ok(()) } @@ -1208,6 +1270,11 @@ impl<'a> RocksDBBatch<'a> { self.inner.is_empty() } + /// Returns the size of the batch in bytes. + pub fn size_in_bytes(&self) -> usize { + self.inner.size_in_bytes() + } + /// Returns a reference to the underlying `RocksDB` provider. pub const fn provider(&self) -> &RocksDBProvider { self.provider @@ -2767,4 +2834,40 @@ mod tests { assert_eq!(shards[1].0.highest_block_number, u64::MAX); assert_eq!(shards[1].1.iter().collect::>(), (51..=75).collect::>()); } + + #[test] + fn test_batch_auto_commit_on_threshold() { + let temp_dir = TempDir::new().unwrap(); + let provider = + RocksDBBuilder::new(temp_dir.path()).with_table::().build().unwrap(); + + // Create batch with tiny threshold (1KB) to force auto-commits + let mut batch = RocksDBBatch { + provider: &provider, + inner: WriteBatchWithTransaction::::default(), + buf: Vec::new(), + auto_commit_threshold: Some(1024), // 1KB + }; + + // Write entries until we exceed threshold multiple times + // Each entry is ~20 bytes, so 100 entries = ~2KB = 2 auto-commits + for i in 0..100u64 { + let value = format!("value_{i:04}").into_bytes(); + batch.put::(i, &value).unwrap(); + } + + // Data should already be visible (auto-committed) even before final commit + // At least some entries should be readable + let first_visible = provider.get::(0).unwrap(); + assert!(first_visible.is_some(), "Auto-committed data should be visible"); + + // Final commit for remaining batch + batch.commit().unwrap(); + + // All entries should now be visible + for i in 0..100u64 { + let value = format!("value_{i:04}").into_bytes(); + assert_eq!(provider.get::(i).unwrap(), Some(value)); + } + } } diff --git a/crates/storage/provider/src/traits/rocksdb_provider.rs b/crates/storage/provider/src/traits/rocksdb_provider.rs index b4abacd86e..07c9b47866 100644 --- a/crates/storage/provider/src/traits/rocksdb_provider.rs +++ b/crates/storage/provider/src/traits/rocksdb_provider.rs @@ -58,4 +58,30 @@ pub trait RocksDBProviderFactory { Ok(result) } } + + /// Executes a closure with a `RocksDB` batch that auto-commits on threshold. + /// + /// Unlike [`Self::with_rocksdb_batch`], this uses a batch that automatically commits + /// when it exceeds the size threshold, preventing OOM during large bulk writes. + /// The consistency check on startup heals any crash between auto-commits. + fn with_rocksdb_batch_auto_commit(&self, f: F) -> ProviderResult + where + F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option)>, + { + #[cfg(all(unix, feature = "rocksdb"))] + { + let rocksdb = self.rocksdb_provider(); + let batch = rocksdb.batch_with_auto_commit(); + let (result, raw_batch) = f(batch)?; + if let Some(b) = raw_batch { + self.set_pending_rocksdb_batch(b); + } + Ok(result) + } + #[cfg(not(all(unix, feature = "rocksdb")))] + { + let (result, _) = f(())?; + Ok(result) + } + } }