From b814893221d55a252d37db59839e455c927e0de7 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Fri, 23 Jan 2026 17:02:53 +0000 Subject: [PATCH] feat(stages): flush RocksDB at end of history and tx_lookup stages (#21367) --- .../src/stages/index_account_history.rs | 7 +++- .../src/stages/index_storage_history.rs | 6 ++- crates/stages/stages/src/stages/tx_lookup.rs | 6 ++- crates/storage/errors/src/db.rs | 2 + .../src/providers/rocksdb/provider.rs | 39 +++++++++++++++++++ .../provider/src/providers/rocksdb_stub.rs | 7 ++++ 6 files changed, 63 insertions(+), 4 deletions(-) diff --git a/crates/stages/stages/src/stages/index_account_history.rs b/crates/stages/stages/src/stages/index_account_history.rs index 5a3ba750d5..91334c10cf 100644 --- a/crates/stages/stages/src/stages/index_account_history.rs +++ b/crates/stages/stages/src/stages/index_account_history.rs @@ -1,7 +1,7 @@ use super::collect_account_history_indices; use crate::stages::utils::{collect_history_indices, load_account_history}; use reth_config::config::{EtlConfig, IndexHistoryConfig}; -use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut}; +use reth_db_api::{models::ShardedKey, table::Table, tables, transaction::DbTxMut}; use reth_provider::{ DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, RocksDBProviderFactory, StorageSettingsCache, @@ -111,7 +111,6 @@ where // but this is safe for first_sync because if we crash before commit, the // checkpoint stays at 0 and we'll just clear and rebuild again on restart. The // source data (changesets) is intact. - #[cfg(all(unix, feature = "rocksdb"))] provider.rocksdb_provider().clear::()?; } else { provider.tx_ref().clear::()?; @@ -143,6 +142,10 @@ where Ok(((), writer.into_raw_rocksdb_batch())) })?; + if use_rocksdb { + provider.rocksdb_provider().flush(&[tables::AccountsHistory::NAME])?; + } + Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true }) } diff --git a/crates/stages/stages/src/stages/index_storage_history.rs b/crates/stages/stages/src/stages/index_storage_history.rs index 08192c8871..0990575500 100644 --- a/crates/stages/stages/src/stages/index_storage_history.rs +++ b/crates/stages/stages/src/stages/index_storage_history.rs @@ -3,6 +3,7 @@ use crate::{stages::utils::load_storage_history, StageCheckpoint, StageId}; use reth_config::config::{EtlConfig, IndexHistoryConfig}; use reth_db_api::{ models::{storage_sharded_key::StorageShardedKey, AddressStorageKey, BlockNumberAddress}, + table::Table, tables, transaction::DbTxMut, }; @@ -115,7 +116,6 @@ where // but this is safe for first_sync because if we crash before commit, the // checkpoint stays at 0 and we'll just clear and rebuild again on restart. The // source data (changesets) is intact. - #[cfg(all(unix, feature = "rocksdb"))] provider.rocksdb_provider().clear::()?; } else { provider.tx_ref().clear::()?; @@ -147,6 +147,10 @@ where Ok(((), writer.into_raw_rocksdb_batch())) })?; + if use_rocksdb { + provider.rocksdb_provider().flush(&[tables::StoragesHistory::NAME])?; + } + Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true }) } diff --git a/crates/stages/stages/src/stages/tx_lookup.rs b/crates/stages/stages/src/stages/tx_lookup.rs index bf056a655b..404cecae56 100644 --- a/crates/stages/stages/src/stages/tx_lookup.rs +++ b/crates/stages/stages/src/stages/tx_lookup.rs @@ -3,7 +3,7 @@ use alloy_primitives::{TxHash, TxNumber}; use num_traits::Zero; use reth_config::config::{EtlConfig, TransactionLookupConfig}; use reth_db_api::{ - table::{Decode, Decompress, Value}, + table::{Decode, Decompress, Table, Value}, tables, transaction::DbTxMut, }; @@ -200,6 +200,10 @@ where } } + if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb { + provider.rocksdb_provider().flush(&[tables::TransactionHashNumbers::NAME])?; + } + Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()) .with_entities_stage_checkpoint(stage_checkpoint(provider)?), diff --git a/crates/storage/errors/src/db.rs b/crates/storage/errors/src/db.rs index 300491ed8a..5f6da8f347 100644 --- a/crates/storage/errors/src/db.rs +++ b/crates/storage/errors/src/db.rs @@ -115,6 +115,8 @@ pub enum DatabaseWriteOperation { PutUpsert, /// Put append. PutAppend, + /// Flush to disk. + Flush, } /// Database log level. diff --git a/crates/storage/provider/src/providers/rocksdb/provider.rs b/crates/storage/provider/src/providers/rocksdb/provider.rs index 75b9e6fa5d..7f322c7492 100644 --- a/crates/storage/provider/src/providers/rocksdb/provider.rs +++ b/crates/storage/provider/src/providers/rocksdb/provider.rs @@ -833,6 +833,45 @@ impl RocksDBProvider { self.0.table_stats() } + /// Flushes pending writes for the specified tables to disk. + /// + /// This performs a flush of: + /// 1. The Write-Ahead Log (WAL) with sync + /// 2. The column family memtables for the specified table names to SST files + /// + /// After this call completes, all data for the specified tables is durably persisted to disk. + /// + /// # Panics + /// Panics if the provider is in read-only mode. + #[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(tables = ?tables))] + pub fn flush(&self, tables: &[&'static str]) -> ProviderResult<()> { + let db = self.0.db_rw(); + + db.flush_wal(true).map_err(|e| { + ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError { + info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 }, + operation: DatabaseWriteOperation::Flush, + table_name: "WAL", + key: Vec::new(), + }))) + })?; + + for cf_name in tables { + if let Some(cf) = db.cf_handle(cf_name) { + db.flush_cf(&cf).map_err(|e| { + ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError { + info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 }, + operation: DatabaseWriteOperation::Flush, + table_name: cf_name, + key: Vec::new(), + }))) + })?; + } + } + + Ok(()) + } + /// Creates a raw iterator over all entries in the specified table. /// /// Returns raw `(key_bytes, value_bytes)` pairs without decoding. diff --git a/crates/storage/provider/src/providers/rocksdb_stub.rs b/crates/storage/provider/src/providers/rocksdb_stub.rs index 965877db2e..31c38103e3 100644 --- a/crates/storage/provider/src/providers/rocksdb_stub.rs +++ b/crates/storage/provider/src/providers/rocksdb_stub.rs @@ -88,6 +88,13 @@ impl RocksDBProvider { pub const fn clear(&self) -> ProviderResult<()> { Ok(()) } + + /// Flushes all pending writes to disk (stub implementation). + /// + /// This is a no-op since there is no `RocksDB` when the feature is disabled. + pub const fn flush(&self, _tables: &[&'static str]) -> ProviderResult<()> { + Ok(()) + } } impl DatabaseMetrics for RocksDBProvider {