feat(stages): flush RocksDB at end of history and tx_lookup stages (#21367)

This commit is contained in:
joshieDo
2026-01-23 17:02:53 +00:00
committed by GitHub
parent fcef82261d
commit b814893221
6 changed files with 63 additions and 4 deletions

View File

@@ -1,7 +1,7 @@
use super::collect_account_history_indices;
use crate::stages::utils::{collect_history_indices, load_account_history};
use reth_config::config::{EtlConfig, IndexHistoryConfig};
use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut};
use reth_db_api::{models::ShardedKey, table::Table, tables, transaction::DbTxMut};
use reth_provider::{
DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
RocksDBProviderFactory, StorageSettingsCache,
@@ -111,7 +111,6 @@ where
// but this is safe for first_sync because if we crash before commit, the
// checkpoint stays at 0 and we'll just clear and rebuild again on restart. The
// source data (changesets) is intact.
#[cfg(all(unix, feature = "rocksdb"))]
provider.rocksdb_provider().clear::<tables::AccountsHistory>()?;
} else {
provider.tx_ref().clear::<tables::AccountsHistory>()?;
@@ -143,6 +142,10 @@ where
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
if use_rocksdb {
provider.rocksdb_provider().flush(&[tables::AccountsHistory::NAME])?;
}
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true })
}

View File

@@ -3,6 +3,7 @@ use crate::{stages::utils::load_storage_history, StageCheckpoint, StageId};
use reth_config::config::{EtlConfig, IndexHistoryConfig};
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, AddressStorageKey, BlockNumberAddress},
table::Table,
tables,
transaction::DbTxMut,
};
@@ -115,7 +116,6 @@ where
// but this is safe for first_sync because if we crash before commit, the
// checkpoint stays at 0 and we'll just clear and rebuild again on restart. The
// source data (changesets) is intact.
#[cfg(all(unix, feature = "rocksdb"))]
provider.rocksdb_provider().clear::<tables::StoragesHistory>()?;
} else {
provider.tx_ref().clear::<tables::StoragesHistory>()?;
@@ -147,6 +147,10 @@ where
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
if use_rocksdb {
provider.rocksdb_provider().flush(&[tables::StoragesHistory::NAME])?;
}
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true })
}

View File

@@ -3,7 +3,7 @@ use alloy_primitives::{TxHash, TxNumber};
use num_traits::Zero;
use reth_config::config::{EtlConfig, TransactionLookupConfig};
use reth_db_api::{
table::{Decode, Decompress, Value},
table::{Decode, Decompress, Table, Value},
tables,
transaction::DbTxMut,
};
@@ -200,6 +200,10 @@ where
}
}
if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
provider.rocksdb_provider().flush(&[tables::TransactionHashNumbers::NAME])?;
}
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(input.target())
.with_entities_stage_checkpoint(stage_checkpoint(provider)?),

View File

@@ -115,6 +115,8 @@ pub enum DatabaseWriteOperation {
PutUpsert,
/// Put append.
PutAppend,
/// Flush to disk.
Flush,
}
/// Database log level.

View File

@@ -833,6 +833,45 @@ impl RocksDBProvider {
self.0.table_stats()
}
/// Flushes pending writes for the specified tables to disk.
///
/// This performs a flush of:
/// 1. The Write-Ahead Log (WAL) with sync
/// 2. The column family memtables for the specified table names to SST files
///
/// After this call completes, all data for the specified tables is durably persisted to disk.
///
/// # Panics
/// Panics if the provider is in read-only mode.
#[instrument(level = "debug", target = "providers::rocksdb", skip_all, fields(tables = ?tables))]
pub fn flush(&self, tables: &[&'static str]) -> ProviderResult<()> {
let db = self.0.db_rw();
db.flush_wal(true).map_err(|e| {
ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
operation: DatabaseWriteOperation::Flush,
table_name: "WAL",
key: Vec::new(),
})))
})?;
for cf_name in tables {
if let Some(cf) = db.cf_handle(cf_name) {
db.flush_cf(&cf).map_err(|e| {
ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError {
info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 },
operation: DatabaseWriteOperation::Flush,
table_name: cf_name,
key: Vec::new(),
})))
})?;
}
}
Ok(())
}
/// Creates a raw iterator over all entries in the specified table.
///
/// Returns raw `(key_bytes, value_bytes)` pairs without decoding.

View File

@@ -88,6 +88,13 @@ impl RocksDBProvider {
pub const fn clear<T>(&self) -> ProviderResult<()> {
Ok(())
}
/// Flushes all pending writes to disk (stub implementation).
///
/// This is a no-op since there is no `RocksDB` when the feature is disabled.
pub const fn flush(&self, _tables: &[&'static str]) -> ProviderResult<()> {
Ok(())
}
}
impl DatabaseMetrics for RocksDBProvider {