fix(stages): commit RocksDB batches before flush and configure immediate WAL cleanup (#21374)

Co-authored-by: Amp <amp@ampcode.com>
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
joshieDo
2026-01-23 19:28:52 +00:00
committed by GitHub
parent decb56fae1
commit ab418642b4
9 changed files with 69 additions and 19 deletions

View File

@@ -1,7 +1,9 @@
use super::collect_account_history_indices;
use crate::stages::utils::{collect_history_indices, load_account_history};
use reth_config::config::{EtlConfig, IndexHistoryConfig};
use reth_db_api::{models::ShardedKey, table::Table, tables, transaction::DbTxMut};
#[cfg(all(unix, feature = "rocksdb"))]
use reth_db_api::Tables;
use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut};
use reth_provider::{
DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
RocksDBProviderFactory, StorageSettingsCache,
@@ -142,8 +144,10 @@ where
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
#[cfg(all(unix, feature = "rocksdb"))]
if use_rocksdb {
provider.rocksdb_provider().flush(&[tables::AccountsHistory::NAME])?;
provider.commit_pending_rocksdb_batches()?;
provider.rocksdb_provider().flush(&[Tables::AccountsHistory.name()])?;
}
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true })

View File

@@ -1,9 +1,10 @@
use super::{collect_history_indices, collect_storage_history_indices};
use crate::{stages::utils::load_storage_history, StageCheckpoint, StageId};
use reth_config::config::{EtlConfig, IndexHistoryConfig};
#[cfg(all(unix, feature = "rocksdb"))]
use reth_db_api::Tables;
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, AddressStorageKey, BlockNumberAddress},
table::Table,
tables,
transaction::DbTxMut,
};
@@ -147,8 +148,10 @@ where
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
#[cfg(all(unix, feature = "rocksdb"))]
if use_rocksdb {
provider.rocksdb_provider().flush(&[tables::StoragesHistory::NAME])?;
provider.commit_pending_rocksdb_batches()?;
provider.rocksdb_provider().flush(&[Tables::StoragesHistory.name()])?;
}
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true })

View File

@@ -2,8 +2,10 @@ use alloy_eips::eip2718::Encodable2718;
use alloy_primitives::{TxHash, TxNumber};
use num_traits::Zero;
use reth_config::config::{EtlConfig, TransactionLookupConfig};
#[cfg(all(unix, feature = "rocksdb"))]
use reth_db_api::Tables;
use reth_db_api::{
table::{Decode, Decompress, Table, Value},
table::{Decode, Decompress, Value},
tables,
transaction::DbTxMut,
};
@@ -200,8 +202,10 @@ where
}
}
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
provider.rocksdb_provider().flush(&[tables::TransactionHashNumbers::NAME])?;
provider.commit_pending_rocksdb_batches()?;
provider.rocksdb_provider().flush(&[Tables::TransactionHashNumbers.name()])?;
}
Ok(ExecOutput {

View File

@@ -186,6 +186,11 @@ impl<N: ProviderNodeTypes> RocksDBProviderFactory for BlockchainProvider<N> {
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
unimplemented!("BlockchainProvider wraps ProviderFactory - use DatabaseProvider::set_pending_rocksdb_batch instead")
}
#[cfg(all(unix, feature = "rocksdb"))]
fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
unimplemented!("BlockchainProvider wraps ProviderFactory - use DatabaseProvider::commit_pending_rocksdb_batches instead")
}
}
impl<N: ProviderNodeTypes> HeaderProvider for BlockchainProvider<N> {

View File

@@ -171,6 +171,11 @@ impl<N: NodeTypesWithDB> RocksDBProviderFactory for ProviderFactory<N> {
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::set_pending_rocksdb_batch instead")
}
#[cfg(all(unix, feature = "rocksdb"))]
fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::commit_pending_rocksdb_batches instead")
}
}
impl<N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>> ProviderFactory<N> {

View File

@@ -321,6 +321,15 @@ impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
self.pending_rocksdb_batches.lock().push(batch);
}
#[cfg(all(unix, feature = "rocksdb"))]
fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
for batch in batches {
self.rocksdb_provider.commit_batch(batch)?;
}
Ok(())
}
}
impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider

View File

@@ -175,6 +175,11 @@ impl RocksDBBuilder {
options.set_log_level(log_level);
// Delete obsolete WAL files immediately after all column families have flushed.
// Both set to 0 means "delete ASAP, no archival".
options.set_wal_ttl_seconds(0);
options.set_wal_size_limit_mb(0);
// Statistics can view from RocksDB log file
if enable_statistics {
options.enable_statistics();
@@ -836,8 +841,8 @@ impl RocksDBProvider {
/// Flushes pending writes for the specified tables to disk.
///
/// This performs a flush of:
/// 1. The Write-Ahead Log (WAL) with sync
/// 2. The column family memtables for the specified table names to SST files
/// 1. The column family memtables for the specified table names to SST files
/// 2. The Write-Ahead Log (WAL) with sync
///
/// After this call completes, all data for the specified tables is durably persisted to disk.
///
@@ -847,15 +852,6 @@ impl RocksDBProvider {
pub fn flush(&self, tables: &[&'static str]) -> ProviderResult<()> {
let db = self.0.db_rw();
db.flush_wal(true).map_err(|e| {
ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
operation: DatabaseWriteOperation::Flush,
table_name: "WAL",
key: Vec::new(),
})))
})?;
for cf_name in tables {
if let Some(cf) = db.cf_handle(cf_name) {
db.flush_cf(&cf).map_err(|e| {
@@ -869,6 +865,15 @@ impl RocksDBProvider {
}
}
db.flush_wal(true).map_err(|e| {
ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
operation: DatabaseWriteOperation::Flush,
table_name: "WAL",
key: Vec::new(),
})))
})?;
Ok(())
}

View File

@@ -31,7 +31,10 @@ impl<C: Send + Sync, N: NodePrimitives> RocksDBProviderFactory for NoopProvider<
}
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {
// No-op for NoopProvider
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {}
#[cfg(all(unix, feature = "rocksdb"))]
fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
Ok(())
}
}

View File

@@ -19,6 +19,14 @@ pub trait RocksDBProviderFactory {
#[cfg(all(unix, feature = "rocksdb"))]
fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>);
/// Takes all pending `RocksDB` batches and commits them.
///
/// This drains the pending batches from the lock and commits each one using the `RocksDB`
/// provider. Can be called before flush to persist `RocksDB` writes independently of the
/// full commit path.
#[cfg(all(unix, feature = "rocksdb"))]
fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()>;
/// Executes a closure with a `RocksDB` transaction for reading.
///
/// This helper encapsulates all the cfg-gated `RocksDB` transaction handling for reads.
@@ -154,6 +162,10 @@ mod tests {
}
fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction<true>) {}
fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
Ok(())
}
}
#[test]