fix(rocksdb): periodic batch commits in stages to prevent OOM (#21334)

Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
joshieDo
2026-01-22 19:04:56 +00:00
committed by GitHub
parent be5a4ac7a6
commit a0df561117
6 changed files with 134 additions and 11 deletions

View File

@@ -136,7 +136,7 @@ where
info!(target: "sync::stages::index_account_history::exec", "Loading indices into database");
provider.with_rocksdb_batch(|rocksdb_batch| {
provider.with_rocksdb_batch_auto_commit(|rocksdb_batch| {
let mut writer = EitherWriter::new_accounts_history(provider, rocksdb_batch)?;
load_account_history(collector, first_sync, &mut writer)
.map_err(|e| reth_provider::ProviderError::other(Box::new(e)))?;

View File

@@ -140,7 +140,7 @@ where
info!(target: "sync::stages::index_storage_history::exec", "Loading indices into database");
provider.with_rocksdb_batch(|rocksdb_batch| {
provider.with_rocksdb_batch_auto_commit(|rocksdb_batch| {
let mut writer = EitherWriter::new_storages_history(provider, rocksdb_batch)?;
load_storage_history(collector, first_sync, &mut writer)
.map_err(|e| reth_provider::ProviderError::other(Box::new(e)))?;

View File

@@ -1,5 +1,6 @@
/// The bodies stage.
mod bodies;
mod era;
/// The execution stage that generates state diff.
mod execution;
/// The finish stage
@@ -36,9 +37,7 @@ pub use prune::*;
pub use sender_recovery::*;
pub use tx_lookup::*;
mod era;
mod utils;
use utils::*;
#[cfg(test)]

View File

@@ -158,15 +158,13 @@ where
let append_only =
provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
// Create RocksDB batch if feature is enabled
// Auto-commits on threshold; consistency check heals any crash.
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb = provider.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_batch = rocksdb.batch();
let rocksdb_batch = rocksdb.batch_with_auto_commit();
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocksdb_batch = ();
// Create writer that routes to either MDBX or RocksDB based on settings
let mut writer =
EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
@@ -217,15 +215,12 @@ where
) -> Result<UnwindOutput, StageError> {
let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);
// Create RocksDB batch if feature is enabled
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb = provider.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_batch = rocksdb.batch();
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocksdb_batch = ();
// Create writer that routes to either MDBX or RocksDB based on settings
let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
let static_file_provider = provider.static_file_provider();

View File

@@ -98,6 +98,13 @@ const DEFAULT_BYTES_PER_SYNC: u64 = 1_048_576;
/// reducing the first few reallocations without over-allocating.
const DEFAULT_COMPRESS_BUF_CAPACITY: usize = 4096;
/// Default auto-commit threshold for batch writes (4 GiB).
///
/// When a batch exceeds this size, it is automatically committed to prevent OOM
/// during large bulk writes. The consistency check on startup heals any crash
/// that occurs between auto-commits.
const DEFAULT_AUTO_COMMIT_THRESHOLD: usize = 4 * 1024 * 1024 * 1024;
/// Builder for [`RocksDBProvider`].
pub struct RocksDBBuilder {
path: PathBuf,
@@ -629,6 +636,21 @@ impl RocksDBProvider {
provider: self,
inner: WriteBatchWithTransaction::<true>::default(),
buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
auto_commit_threshold: None,
}
}
/// Creates a new batch with auto-commit enabled.
///
/// When the batch size exceeds the threshold (4 GiB), the batch is automatically
/// committed and reset. This prevents OOM during large bulk writes while maintaining
/// crash-safety via the consistency check on startup.
pub fn batch_with_auto_commit(&self) -> RocksDBBatch<'_> {
RocksDBBatch {
provider: self,
inner: WriteBatchWithTransaction::<true>::default(),
buf: Vec::with_capacity(DEFAULT_COMPRESS_BUF_CAPACITY),
auto_commit_threshold: Some(DEFAULT_AUTO_COMMIT_THRESHOLD),
}
}
@@ -1137,11 +1159,16 @@ impl RocksDBProvider {
/// Unlike [`RocksTx`], this does NOT support read-your-writes. Use for write-only flows
/// where you don't need to read back uncommitted data within the same operation
/// (e.g., history index writes).
///
/// When `auto_commit_threshold` is set, the batch will automatically commit and reset
/// when the batch size exceeds the threshold. This prevents OOM during large bulk writes.
#[must_use = "batch must be committed"]
pub struct RocksDBBatch<'a> {
provider: &'a RocksDBProvider,
inner: WriteBatchWithTransaction<true>,
buf: Vec<u8>,
/// If set, batch auto-commits when size exceeds this threshold (in bytes).
auto_commit_threshold: Option<usize>,
}
impl fmt::Debug for RocksDBBatch<'_> {
@@ -1160,12 +1187,16 @@ impl fmt::Debug for RocksDBBatch<'_> {
impl<'a> RocksDBBatch<'a> {
/// Puts a value into the batch.
///
/// If auto-commit is enabled and the batch exceeds the threshold, commits and resets.
pub fn put<T: Table>(&mut self, key: T::Key, value: &T::Value) -> ProviderResult<()> {
let encoded_key = key.encode();
self.put_encoded::<T>(&encoded_key, value)
}
/// Puts a value into the batch using pre-encoded key.
///
/// If auto-commit is enabled and the batch exceeds the threshold, commits and resets.
pub fn put_encoded<T: Table>(
&mut self,
key: &<T::Key as Encode>::Encoded,
@@ -1173,12 +1204,43 @@ impl<'a> RocksDBBatch<'a> {
) -> ProviderResult<()> {
let value_bytes = compress_to_buf_or_ref!(self.buf, value).unwrap_or(&self.buf);
self.inner.put_cf(self.provider.get_cf_handle::<T>()?, key, value_bytes);
self.maybe_auto_commit()?;
Ok(())
}
/// Deletes a value from the batch.
///
/// If auto-commit is enabled and the batch exceeds the threshold, commits and resets.
pub fn delete<T: Table>(&mut self, key: T::Key) -> ProviderResult<()> {
self.inner.delete_cf(self.provider.get_cf_handle::<T>()?, key.encode().as_ref());
self.maybe_auto_commit()?;
Ok(())
}
/// Commits and resets the batch if it exceeds the auto-commit threshold.
///
/// This is called after each `put` or `delete` operation to prevent unbounded memory growth.
/// Returns immediately if auto-commit is disabled or threshold not reached.
fn maybe_auto_commit(&mut self) -> ProviderResult<()> {
if let Some(threshold) = self.auto_commit_threshold &&
self.inner.size_in_bytes() >= threshold
{
tracing::debug!(
target: "providers::rocksdb",
batch_size = self.inner.size_in_bytes(),
threshold,
"Auto-committing RocksDB batch"
);
let old_batch = std::mem::take(&mut self.inner);
self.provider.0.db_rw().write_opt(old_batch, &WriteOptions::default()).map_err(
|e| {
ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
},
)?;
}
Ok(())
}
@@ -1208,6 +1270,11 @@ impl<'a> RocksDBBatch<'a> {
self.inner.is_empty()
}
/// Returns the size of the batch in bytes.
pub fn size_in_bytes(&self) -> usize {
self.inner.size_in_bytes()
}
/// Returns a reference to the underlying `RocksDB` provider.
pub const fn provider(&self) -> &RocksDBProvider {
self.provider
@@ -2767,4 +2834,40 @@ mod tests {
assert_eq!(shards[1].0.highest_block_number, u64::MAX);
assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
}
#[test]
fn test_batch_auto_commit_on_threshold() {
let temp_dir = TempDir::new().unwrap();
let provider =
RocksDBBuilder::new(temp_dir.path()).with_table::<TestTable>().build().unwrap();
// Create batch with tiny threshold (1KB) to force auto-commits
let mut batch = RocksDBBatch {
provider: &provider,
inner: WriteBatchWithTransaction::<true>::default(),
buf: Vec::new(),
auto_commit_threshold: Some(1024), // 1KB
};
// Write entries until we exceed threshold multiple times
// Each entry is ~20 bytes, so 100 entries = ~2KB = 2 auto-commits
for i in 0..100u64 {
let value = format!("value_{i:04}").into_bytes();
batch.put::<TestTable>(i, &value).unwrap();
}
// Data should already be visible (auto-committed) even before final commit
// At least some entries should be readable
let first_visible = provider.get::<TestTable>(0).unwrap();
assert!(first_visible.is_some(), "Auto-committed data should be visible");
// Final commit for remaining batch
batch.commit().unwrap();
// All entries should now be visible
for i in 0..100u64 {
let value = format!("value_{i:04}").into_bytes();
assert_eq!(provider.get::<TestTable>(i).unwrap(), Some(value));
}
}
}

View File

@@ -58,4 +58,30 @@ pub trait RocksDBProviderFactory {
Ok(result)
}
}
/// Executes a closure with a `RocksDB` batch that auto-commits on threshold.
///
/// Unlike [`Self::with_rocksdb_batch`], this uses a batch that automatically commits
/// when it exceeds the size threshold, preventing OOM during large bulk writes.
/// The consistency check on startup heals any crash between auto-commits.
fn with_rocksdb_batch_auto_commit<F, R>(&self, f: F) -> ProviderResult<R>
where
F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
{
#[cfg(all(unix, feature = "rocksdb"))]
{
let rocksdb = self.rocksdb_provider();
let batch = rocksdb.batch_with_auto_commit();
let (result, raw_batch) = f(batch)?;
if let Some(b) = raw_batch {
self.set_pending_rocksdb_batch(b);
}
Ok(result)
}
#[cfg(not(all(unix, feature = "rocksdb")))]
{
let (result, _) = f(())?;
Ok(result)
}
}
}