diff --git a/crates/stages/stages/src/stages/index_account_history.rs b/crates/stages/stages/src/stages/index_account_history.rs index 91334c10cf..d6871475ce 100644 --- a/crates/stages/stages/src/stages/index_account_history.rs +++ b/crates/stages/stages/src/stages/index_account_history.rs @@ -1,7 +1,9 @@ use super::collect_account_history_indices; use crate::stages::utils::{collect_history_indices, load_account_history}; use reth_config::config::{EtlConfig, IndexHistoryConfig}; -use reth_db_api::{models::ShardedKey, table::Table, tables, transaction::DbTxMut}; +#[cfg(all(unix, feature = "rocksdb"))] +use reth_db_api::Tables; +use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut}; use reth_provider::{ DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, RocksDBProviderFactory, StorageSettingsCache, @@ -142,8 +144,10 @@ where Ok(((), writer.into_raw_rocksdb_batch())) })?; + #[cfg(all(unix, feature = "rocksdb"))] if use_rocksdb { - provider.rocksdb_provider().flush(&[tables::AccountsHistory::NAME])?; + provider.commit_pending_rocksdb_batches()?; + provider.rocksdb_provider().flush(&[Tables::AccountsHistory.name()])?; } Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true }) diff --git a/crates/stages/stages/src/stages/index_storage_history.rs b/crates/stages/stages/src/stages/index_storage_history.rs index 0990575500..7b7d39f6d6 100644 --- a/crates/stages/stages/src/stages/index_storage_history.rs +++ b/crates/stages/stages/src/stages/index_storage_history.rs @@ -1,9 +1,10 @@ use super::{collect_history_indices, collect_storage_history_indices}; use crate::{stages::utils::load_storage_history, StageCheckpoint, StageId}; use reth_config::config::{EtlConfig, IndexHistoryConfig}; +#[cfg(all(unix, feature = "rocksdb"))] +use reth_db_api::Tables; use reth_db_api::{ models::{storage_sharded_key::StorageShardedKey, AddressStorageKey, BlockNumberAddress}, - table::Table, tables, transaction::DbTxMut, }; @@ -147,8 +148,10 @@ where Ok(((), writer.into_raw_rocksdb_batch())) })?; + #[cfg(all(unix, feature = "rocksdb"))] if use_rocksdb { - provider.rocksdb_provider().flush(&[tables::StoragesHistory::NAME])?; + provider.commit_pending_rocksdb_batches()?; + provider.rocksdb_provider().flush(&[Tables::StoragesHistory.name()])?; } Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true }) diff --git a/crates/stages/stages/src/stages/tx_lookup.rs b/crates/stages/stages/src/stages/tx_lookup.rs index 404cecae56..1af65fb8d7 100644 --- a/crates/stages/stages/src/stages/tx_lookup.rs +++ b/crates/stages/stages/src/stages/tx_lookup.rs @@ -2,8 +2,10 @@ use alloy_eips::eip2718::Encodable2718; use alloy_primitives::{TxHash, TxNumber}; use num_traits::Zero; use reth_config::config::{EtlConfig, TransactionLookupConfig}; +#[cfg(all(unix, feature = "rocksdb"))] +use reth_db_api::Tables; use reth_db_api::{ - table::{Decode, Decompress, Table, Value}, + table::{Decode, Decompress, Value}, tables, transaction::DbTxMut, }; @@ -200,8 +202,10 @@ where } } + #[cfg(all(unix, feature = "rocksdb"))] if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb { - provider.rocksdb_provider().flush(&[tables::TransactionHashNumbers::NAME])?; + provider.commit_pending_rocksdb_batches()?; + provider.rocksdb_provider().flush(&[Tables::TransactionHashNumbers.name()])?; } Ok(ExecOutput { diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index 1644219428..141a5074b6 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -186,6 +186,11 @@ impl RocksDBProviderFactory for BlockchainProvider { fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction) { unimplemented!("BlockchainProvider wraps ProviderFactory - use DatabaseProvider::set_pending_rocksdb_batch instead") } + + #[cfg(all(unix, feature = "rocksdb"))] + fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> { + unimplemented!("BlockchainProvider wraps ProviderFactory - use DatabaseProvider::commit_pending_rocksdb_batches instead") + } } impl HeaderProvider for BlockchainProvider { diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index b35af670c9..b9f7f1ccdd 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -171,6 +171,11 @@ impl RocksDBProviderFactory for ProviderFactory { fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction) { unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::set_pending_rocksdb_batch instead") } + + #[cfg(all(unix, feature = "rocksdb"))] + fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> { + unimplemented!("ProviderFactory is a factory, not a provider - use DatabaseProvider::commit_pending_rocksdb_batches instead") + } } impl>> ProviderFactory { diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 940424da84..d693791fa5 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -321,6 +321,15 @@ impl RocksDBProviderFactory for DatabaseProvider { fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction) { self.pending_rocksdb_batches.lock().push(batch); } + + #[cfg(all(unix, feature = "rocksdb"))] + fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> { + let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock()); + for batch in batches { + self.rocksdb_provider.commit_batch(batch)?; + } + Ok(()) + } } impl> ChainSpecProvider diff --git a/crates/storage/provider/src/providers/rocksdb/provider.rs b/crates/storage/provider/src/providers/rocksdb/provider.rs index 7f322c7492..0cc85f43c4 100644 --- a/crates/storage/provider/src/providers/rocksdb/provider.rs +++ b/crates/storage/provider/src/providers/rocksdb/provider.rs @@ -175,6 +175,11 @@ impl RocksDBBuilder { options.set_log_level(log_level); + // Delete obsolete WAL files immediately after all column families have flushed. + // Both set to 0 means "delete ASAP, no archival". + options.set_wal_ttl_seconds(0); + options.set_wal_size_limit_mb(0); + // Statistics can view from RocksDB log file if enable_statistics { options.enable_statistics(); @@ -836,8 +841,8 @@ impl RocksDBProvider { /// Flushes pending writes for the specified tables to disk. /// /// This performs a flush of: - /// 1. The Write-Ahead Log (WAL) with sync - /// 2. The column family memtables for the specified table names to SST files + /// 1. The column family memtables for the specified table names to SST files + /// 2. The Write-Ahead Log (WAL) with sync /// /// After this call completes, all data for the specified tables is durably persisted to disk. /// @@ -847,15 +852,6 @@ impl RocksDBProvider { pub fn flush(&self, tables: &[&'static str]) -> ProviderResult<()> { let db = self.0.db_rw(); - db.flush_wal(true).map_err(|e| { - ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError { - info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 }, - operation: DatabaseWriteOperation::Flush, - table_name: "WAL", - key: Vec::new(), - }))) - })?; - for cf_name in tables { if let Some(cf) = db.cf_handle(cf_name) { db.flush_cf(&cf).map_err(|e| { @@ -869,6 +865,15 @@ impl RocksDBProvider { } } + db.flush_wal(true).map_err(|e| { + ProviderError::Database(DatabaseError::Write(Box::new(DatabaseWriteError { + info: DatabaseErrorInfo { message: e.to_string().into(), code: -1 }, + operation: DatabaseWriteOperation::Flush, + table_name: "WAL", + key: Vec::new(), + }))) + })?; + Ok(()) } diff --git a/crates/storage/provider/src/test_utils/noop.rs b/crates/storage/provider/src/test_utils/noop.rs index 64eff68b03..4f1e620bbb 100644 --- a/crates/storage/provider/src/test_utils/noop.rs +++ b/crates/storage/provider/src/test_utils/noop.rs @@ -31,7 +31,10 @@ impl RocksDBProviderFactory for NoopProvider< } #[cfg(all(unix, feature = "rocksdb"))] - fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction) { - // No-op for NoopProvider + fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction) {} + + #[cfg(all(unix, feature = "rocksdb"))] + fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> { + Ok(()) } } diff --git a/crates/storage/provider/src/traits/rocksdb_provider.rs b/crates/storage/provider/src/traits/rocksdb_provider.rs index 06548d2275..02332a9ccf 100644 --- a/crates/storage/provider/src/traits/rocksdb_provider.rs +++ b/crates/storage/provider/src/traits/rocksdb_provider.rs @@ -19,6 +19,14 @@ pub trait RocksDBProviderFactory { #[cfg(all(unix, feature = "rocksdb"))] fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction); + /// Takes all pending `RocksDB` batches and commits them. + /// + /// This drains the pending batches from the lock and commits each one using the `RocksDB` + /// provider. Can be called before flush to persist `RocksDB` writes independently of the + /// full commit path. + #[cfg(all(unix, feature = "rocksdb"))] + fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()>; + /// Executes a closure with a `RocksDB` transaction for reading. /// /// This helper encapsulates all the cfg-gated `RocksDB` transaction handling for reads. @@ -154,6 +162,10 @@ mod tests { } fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction) {} + + fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> { + Ok(()) + } } #[test]