Compare commits

...

2 Commits

Author SHA1 Message Date
yongkangc
5fb9d04948 fix: add RocksDB clear_table, refactor shard writing
- Add clear_table method to RocksDBProvider for clearing all entries
- Clear RocksDB AccountsHistory table on first_sync (matches MDBX behavior)
- Extract duplicate shard-writing loop into write_full_shards helper
- Remove unused append_only parameter from write_account_history_shards_keep_last
2026-01-18 14:19:18 +00:00
yongkangc
40a72a8720 feat(stages): add RocksDB support for IndexAccountHistoryStage
This implements RocksDB support for the IndexAccountHistoryStage following
the TransactionLookupStage pattern:

- Add RocksDBProviderFactory trait bound to Stage impl
- Use explicit #[cfg(all(unix, feature = "rocksdb"))] blocks for RocksDB batch
  creation instead of macros
- Use EitherWriter::new_accounts_history to route writes to MDBX or RocksDB
  based on storage settings
- Add inline helper functions for loading/flushing shards with proper u64::MAX
  handling for last shard
- Add RocksDB-specific tests: execute_writes_to_rocksdb_when_enabled,
  unwind_deletes_from_rocksdb_when_enabled, execute_incremental_sync

Note: Full unwind support for RocksDB requires updates to the HistoryWriter
trait implementation, which is out of scope for this PR.

Closes #21124
2026-01-17 23:18:37 +00:00
2 changed files with 396 additions and 14 deletions

View File

@@ -1,11 +1,19 @@
use crate::stages::utils::collect_history_indices;
use super::{collect_account_history_indices, load_history_indices};
use super::collect_account_history_indices;
use alloy_primitives::Address;
use reth_config::config::{EtlConfig, IndexHistoryConfig};
use reth_db_api::{models::ShardedKey, table::Decode, tables, transaction::DbTxMut};
use reth_db_api::{
cursor::DbCursorRO,
models::{sharded_key::NUM_OF_INDICES_IN_SHARD, ShardedKey},
table::{Decode, Decompress},
tables,
transaction::{DbTx, DbTxMut},
BlockNumberList,
};
use reth_provider::{
DBProvider, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, StorageSettingsCache,
DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
RocksDBProviderFactory, StorageSettingsCache,
};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
use reth_stages_api::{
@@ -53,7 +61,8 @@ where
+ PruneCheckpointWriter
+ reth_storage_api::ChangeSetReader
+ reth_provider::StaticFileProviderFactory
+ StorageSettingsCache,
+ StorageSettingsCache
+ RocksDBProviderFactory,
{
/// Return the id of the stage
fn id(&self) -> StageId {
@@ -102,14 +111,26 @@ where
let mut range = input.next_block_range();
let first_sync = input.checkpoint().block_number == 0;
// Check if we're using RocksDB for account history
let use_rocksdb = provider.cached_storage_settings().account_history_in_rocksdb;
// On first sync we might have history coming from genesis. We clear the table since it's
// faster to rebuild from scratch.
if first_sync {
provider.tx_ref().clear::<tables::AccountsHistory>()?;
#[cfg(all(unix, feature = "rocksdb"))]
if use_rocksdb {
provider.rocksdb_provider().clear_table::<tables::AccountsHistory>()?;
}
#[cfg(not(all(unix, feature = "rocksdb")))]
let _ = use_rocksdb;
if !use_rocksdb {
provider.tx_ref().clear::<tables::AccountsHistory>()?;
}
range = 0..=*input.next_block_range().end();
}
info!(target: "sync::stages::index_account_history::exec", ?first_sync, "Collecting indices");
info!(target: "sync::stages::index_account_history::exec", ?first_sync, ?use_rocksdb, "Collecting indices");
let collector = if provider.cached_storage_settings().account_changesets_in_static_files {
// Use the provider-based collection that can read from static files.
@@ -125,14 +146,26 @@ where
};
info!(target: "sync::stages::index_account_history::exec", "Loading indices into database");
load_history_indices::<_, tables::AccountsHistory, _>(
provider,
collector,
first_sync,
ShardedKey::new,
ShardedKey::<Address>::decode_owned,
|key| key.key,
)?;
// Create RocksDB batch if feature is enabled
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb = provider.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_batch = rocksdb.batch();
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocksdb_batch = ();
// Create writer that routes to either MDBX or RocksDB based on settings
let mut writer = EitherWriter::new_accounts_history(provider, rocksdb_batch)?;
// Load indices using the writer
load_account_history_indices_with_writer(provider, &mut writer, collector, first_sync)?;
// Extract and register RocksDB batch for commit at provider level
#[cfg(all(unix, feature = "rocksdb"))]
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
}
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true })
}
@@ -153,6 +186,144 @@ where
}
}
/// Loads account history indices from a collector into the database using an [`EitherWriter`].
///
/// This function processes entries from the collector, grouping indices by address and
/// writing them as shards when they reach the maximum shard size. It handles merging
/// with existing shards during incremental syncs.
fn load_account_history_indices_with_writer<Provider, CURSOR, N>(
provider: &Provider,
writer: &mut EitherWriter<'_, CURSOR, N>,
mut collector: reth_etl::Collector<ShardedKey<Address>, BlockNumberList>,
append_only: bool,
) -> Result<(), StageError>
where
Provider: DBProvider<Tx: DbTxMut> + StorageSettingsCache + RocksDBProviderFactory,
CURSOR: reth_db_api::cursor::DbCursorRW<tables::AccountsHistory>
+ reth_db_api::cursor::DbCursorRO<tables::AccountsHistory>,
N: reth_primitives_traits::NodePrimitives,
{
let mut current_address = Address::ZERO;
let mut current_list = Vec::<u64>::new();
// observability
let total_entries = collector.len();
let interval = (total_entries / 10).max(1);
for (index, element) in collector.iter()?.enumerate() {
let (k, v) = element?;
let sharded_key = ShardedKey::<Address>::decode_owned(k)?;
let new_list = BlockNumberList::decompress_owned(v)?;
if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
info!(target: "sync::stages::index_account_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
}
let address = sharded_key.key;
if current_address != address {
// We have reached the end of this address, flush remaining indices
flush_account_history_shards(writer, current_address, &mut current_list, append_only)?;
current_address = address;
current_list.clear();
// If it's not the first sync, merge with existing last shard
if !append_only &&
let Some(existing_list) =
get_last_account_history_shard(provider, current_address)?
{
current_list.extend(existing_list.iter());
}
}
current_list.extend(new_list.iter());
// Write full shards, keep the partial shard in memory
write_full_shards(writer, current_address, &mut current_list)?;
}
// Flush the last address's remaining shard
flush_account_history_shards(writer, current_address, &mut current_list, append_only)?;
Ok(())
}
/// Retrieves the last shard for an account from `RocksDB` or MDBX based on storage settings.
fn get_last_account_history_shard<Provider>(
provider: &Provider,
address: Address,
) -> Result<Option<BlockNumberList>, StageError>
where
Provider: DBProvider + StorageSettingsCache + RocksDBProviderFactory,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().account_history_in_rocksdb {
let rocksdb = provider.rocksdb_provider();
let key = ShardedKey::last(address);
return Ok(rocksdb.get::<tables::AccountsHistory>(key)?);
}
// Read from MDBX
let key = ShardedKey::last(address);
let mut cursor = provider.tx_ref().cursor_read::<tables::AccountsHistory>()?;
Ok(cursor.seek_exact(key)?.map(|(_, v)| v))
}
/// Writes full shards from `list` to the database, draining them from the list.
fn write_full_shards<CURSOR, N>(
writer: &mut EitherWriter<'_, CURSOR, N>,
address: Address,
list: &mut Vec<u64>,
) -> Result<(), StageError>
where
CURSOR: reth_db_api::cursor::DbCursorRW<tables::AccountsHistory>
+ reth_db_api::cursor::DbCursorRO<tables::AccountsHistory>,
N: reth_primitives_traits::NodePrimitives,
{
while list.len() > NUM_OF_INDICES_IN_SHARD {
let chunk: Vec<u64> = list.drain(..NUM_OF_INDICES_IN_SHARD).collect();
let highest = *chunk.last().expect("chunk is not empty");
let key = ShardedKey::new(address, highest);
let value = BlockNumberList::new_pre_sorted(chunk);
writer.put_account_history(key, &value)?;
}
Ok(())
}
/// Flushes all remaining shards to the database. The last shard gets `u64::MAX` as its highest
/// block.
fn flush_account_history_shards<CURSOR, N>(
writer: &mut EitherWriter<'_, CURSOR, N>,
address: Address,
list: &mut Vec<u64>,
append_only: bool,
) -> Result<(), StageError>
where
CURSOR: reth_db_api::cursor::DbCursorRW<tables::AccountsHistory>
+ reth_db_api::cursor::DbCursorRO<tables::AccountsHistory>,
N: reth_primitives_traits::NodePrimitives,
{
if list.is_empty() {
return Ok(());
}
write_full_shards(writer, address, list)?;
// The last shard always uses u64::MAX
if !list.is_empty() {
let key = ShardedKey::last(address);
let value = BlockNumberList::new_pre_sorted(list.drain(..));
// For incremental sync, delete the old last shard first (it will be replaced)
if !append_only {
writer.delete_account_history(key.clone())?;
}
writer.put_account_history(key, &value)?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
@@ -646,4 +817,187 @@ mod tests {
Ok(())
}
}
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb_tests {
use super::*;
use reth_provider::RocksDBProviderFactory;
use reth_storage_api::StorageSettings;
/// Test that when `account_history_in_rocksdb` is enabled, the stage
/// writes account history indices to RocksDB instead of MDBX.
#[tokio::test]
async fn execute_writes_to_rocksdb_when_enabled() {
// init
let db = TestStageDB::default();
// Enable RocksDB for account history
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// setup - create changesets for blocks 1-10
db.commit(|tx| {
for block in 0..=10 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::AccountChangeSets>(block, acc())?;
}
Ok(())
})
.unwrap();
// Run stage
let input = ExecInput { target: Some(10), ..Default::default() };
let mut stage = IndexAccountHistoryStage::default();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
provider.commit().unwrap();
// Verify MDBX table is empty (data should be in RocksDB)
let mdbx_table = db.table::<tables::AccountsHistory>().unwrap();
assert!(
mdbx_table.is_empty(),
"MDBX AccountsHistory should be empty when RocksDB is enabled"
);
// Verify RocksDB has the data
let rocksdb = db.factory.rocksdb_provider();
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
assert!(result.is_some(), "RocksDB should contain account history");
let block_list = result.unwrap();
let blocks: Vec<u64> = block_list.iter().collect();
assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
}
/// Test that when `account_history_in_rocksdb` is enabled, the stage
/// unwind deletes account history indices from RocksDB instead of MDBX.
///
/// Note: Full unwind support for RocksDB requires updates to the HistoryWriter trait
/// implementation. This test verifies the basic execute functionality.
#[tokio::test]
async fn unwind_deletes_from_rocksdb_when_enabled() {
// init
let db = TestStageDB::default();
// Enable RocksDB for account history
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// setup - create changesets for blocks 1-10
db.commit(|tx| {
for block in 0..=10 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::AccountChangeSets>(block, acc())?;
}
Ok(())
})
.unwrap();
// Run stage to populate data
let input = ExecInput { target: Some(10), ..Default::default() };
let mut stage = IndexAccountHistoryStage::default();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
provider.commit().unwrap();
// Verify RocksDB has the data before unwind
let rocksdb = db.factory.rocksdb_provider();
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
assert!(result.is_some(), "RocksDB should have data before unwind");
// Unwind to block 5
// Note: The current HistoryWriter implementation doesn't yet support RocksDB unwind.
// This test verifies the unwind doesn't panic and properly updates checkpoints.
let unwind_input =
UnwindInput { checkpoint: StageCheckpoint::new(10), unwind_to: 5, bad_block: None };
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.unwind(&provider, unwind_input).unwrap();
assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(5) });
provider.commit().unwrap();
// Verify RocksDB data still exists (unwind currently uses MDBX path which doesn't
// affect RocksDB). Once HistoryWriter is updated for RocksDB, this should verify
// that blocks 6-10 are removed.
let rocksdb = db.factory.rocksdb_provider();
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
assert!(result.is_some(), "RocksDB should still have data");
}
/// Test incremental sync - merging new data with existing data.
#[tokio::test]
async fn execute_incremental_sync() {
// init
let db = TestStageDB::default();
// Enable RocksDB for account history
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
// setup - first batch of changesets
db.commit(|tx| {
for block in 0..=5 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::AccountChangeSets>(block, acc())?;
}
Ok(())
})
.unwrap();
// First sync (blocks 0-5)
let input = ExecInput { target: Some(5), ..Default::default() };
let mut stage = IndexAccountHistoryStage::default();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true });
provider.commit().unwrap();
// Verify first sync data in RocksDB
let rocksdb = db.factory.rocksdb_provider();
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
assert!(result.is_some());
let blocks: Vec<u64> = result.unwrap().iter().collect();
assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
// Add more changesets for blocks 6-10
db.commit(|tx| {
for block in 6..=10 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::AccountChangeSets>(block, acc())?;
}
Ok(())
})
.unwrap();
// Second sync (blocks 6-10)
let input = ExecInput { target: Some(10), checkpoint: Some(StageCheckpoint::new(5)) };
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
provider.commit().unwrap();
// Verify merged data - should have blocks 0-10
let rocksdb = db.factory.rocksdb_provider();
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
assert!(result.is_some(), "RocksDB should have merged data");
let blocks: Vec<u64> = result.unwrap().iter().collect();
assert_eq!(blocks, (0..=10).collect::<Vec<_>>(), "Should have merged blocks 0-10");
}
}
}

View File

@@ -514,6 +514,34 @@ impl RocksDBProvider {
})
}
/// Clears all entries from the specified table.
///
/// This iterates over all keys and deletes them in a single batch.
pub fn clear_table<T: Table>(&self) -> ProviderResult<()> {
self.execute_with_operation_metric(RocksDBOperation::BatchWrite, T::NAME, |this| {
let cf = this.get_cf_handle::<T>()?;
let mut batch = WriteBatchWithTransaction::<true>::default();
let iter = this.0.db.iterator_cf(cf, IteratorMode::Start);
for result in iter {
let (key, _) = result.map_err(|e| {
ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})?;
batch.delete_cf(cf, &key);
}
this.0.db.write_opt(batch, &WriteOptions::default()).map_err(|e| {
ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})
})
}
/// Writes all `RocksDB` data for multiple blocks in parallel.
///
/// This handles transaction hash numbers, account history, and storage history based on