feat(stages): add RocksDB support for IndexAccountHistoryStage (#21165)

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
This commit is contained in:
YK
2026-01-20 15:23:29 +01:00
committed by GitHub
parent a0845bab18
commit bd144a4c42
6 changed files with 970 additions and 45 deletions

View File

@@ -1,11 +1,10 @@
use crate::stages::utils::collect_history_indices;
use super::{collect_account_history_indices, load_history_indices};
use alloy_primitives::Address;
use super::collect_account_history_indices;
use crate::stages::utils::{collect_history_indices, load_account_history};
use reth_config::config::{EtlConfig, IndexHistoryConfig};
use reth_db_api::{models::ShardedKey, table::Decode, tables, transaction::DbTxMut};
use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut};
use reth_provider::{
DBProvider, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, StorageSettingsCache,
DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
RocksDBProviderFactory, StorageSettingsCache,
};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
use reth_stages_api::{
@@ -53,7 +52,8 @@ where
+ PruneCheckpointWriter
+ reth_storage_api::ChangeSetReader
+ reth_provider::StaticFileProviderFactory
+ StorageSettingsCache,
+ StorageSettingsCache
+ RocksDBProviderFactory,
{
/// Return the id of the stage
fn id(&self) -> StageId {
@@ -101,15 +101,25 @@ where
let mut range = input.next_block_range();
let first_sync = input.checkpoint().block_number == 0;
let use_rocksdb = provider.cached_storage_settings().account_history_in_rocksdb;
// On first sync we might have history coming from genesis. We clear the table since it's
// faster to rebuild from scratch.
if first_sync {
provider.tx_ref().clear::<tables::AccountsHistory>()?;
if use_rocksdb {
// Note: RocksDB clear() executes immediately (not deferred to commit like MDBX),
// but this is safe for first_sync because if we crash before commit, the
// checkpoint stays at 0 and we'll just clear and rebuild again on restart. The
// source data (changesets) is intact.
#[cfg(all(unix, feature = "rocksdb"))]
provider.rocksdb_provider().clear::<tables::AccountsHistory>()?;
} else {
provider.tx_ref().clear::<tables::AccountsHistory>()?;
}
range = 0..=*input.next_block_range().end();
}
info!(target: "sync::stages::index_account_history::exec", ?first_sync, "Collecting indices");
info!(target: "sync::stages::index_account_history::exec", ?first_sync, ?use_rocksdb, "Collecting indices");
let collector = if provider.cached_storage_settings().account_changesets_in_static_files {
// Use the provider-based collection that can read from static files.
@@ -125,14 +135,13 @@ where
};
info!(target: "sync::stages::index_account_history::exec", "Loading indices into database");
load_history_indices::<_, tables::AccountsHistory, _>(
provider,
collector,
first_sync,
ShardedKey::new,
ShardedKey::<Address>::decode_owned,
|key| key.key,
)?;
provider.with_rocksdb_batch(|rocksdb_batch| {
let mut writer = EitherWriter::new_accounts_history(provider, rocksdb_batch)?;
load_account_history(collector, first_sync, &mut writer)
.map_err(|e| reth_provider::ProviderError::other(Box::new(e)))?;
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true })
}
@@ -160,7 +169,7 @@ mod tests {
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
TestStageDB, UnwindStageTestRunner,
};
use alloy_primitives::{address, BlockNumber, B256};
use alloy_primitives::{address, Address, BlockNumber, B256};
use itertools::Itertools;
use reth_db_api::{
cursor::DbCursorRO,
@@ -646,4 +655,169 @@ mod tests {
Ok(())
}
}
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb_tests {
use super::*;
use reth_provider::RocksDBProviderFactory;
use reth_storage_api::StorageSettings;
/// Test that when `account_history_in_rocksdb` is enabled, the stage
/// writes account history indices to `RocksDB` instead of MDBX.
#[tokio::test]
async fn execute_writes_to_rocksdb_when_enabled() {
// init
let db = TestStageDB::default();
// Enable RocksDB for account history
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
db.commit(|tx| {
for block in 0..=10 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::AccountChangeSets>(block, acc())?;
}
Ok(())
})
.unwrap();
let input = ExecInput { target: Some(10), ..Default::default() };
let mut stage = IndexAccountHistoryStage::default();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
provider.commit().unwrap();
// Verify MDBX table is empty (data should be in RocksDB)
let mdbx_table = db.table::<tables::AccountsHistory>().unwrap();
assert!(
mdbx_table.is_empty(),
"MDBX AccountsHistory should be empty when RocksDB is enabled"
);
// Verify RocksDB has the data
let rocksdb = db.factory.rocksdb_provider();
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
assert!(result.is_some(), "RocksDB should contain account history");
let block_list = result.unwrap();
let blocks: Vec<u64> = block_list.iter().collect();
assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
}
/// Test that unwind works correctly when `account_history_in_rocksdb` is enabled.
#[tokio::test]
async fn unwind_works_when_rocksdb_enabled() {
let db = TestStageDB::default();
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
db.commit(|tx| {
for block in 0..=10 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::AccountChangeSets>(block, acc())?;
}
Ok(())
})
.unwrap();
let input = ExecInput { target: Some(10), ..Default::default() };
let mut stage = IndexAccountHistoryStage::default();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
provider.commit().unwrap();
// Verify RocksDB has blocks 0-10 before unwind
let rocksdb = db.factory.rocksdb_provider();
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
assert!(result.is_some(), "RocksDB should have data before unwind");
let blocks_before: Vec<u64> = result.unwrap().iter().collect();
assert_eq!(blocks_before, (0..=10).collect::<Vec<_>>());
// Unwind to block 5 (remove blocks 6-10)
let unwind_input =
UnwindInput { checkpoint: StageCheckpoint::new(10), unwind_to: 5, bad_block: None };
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.unwind(&provider, unwind_input).unwrap();
assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(5) });
provider.commit().unwrap();
// Verify RocksDB now only has blocks 0-5 (blocks 6-10 removed)
let rocksdb = db.factory.rocksdb_provider();
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
assert!(result.is_some(), "RocksDB should still have data after unwind");
let blocks_after: Vec<u64> = result.unwrap().iter().collect();
assert_eq!(blocks_after, (0..=5).collect::<Vec<_>>(), "Should only have blocks 0-5");
}
/// Test incremental sync merges new data with existing shards.
#[tokio::test]
async fn execute_incremental_sync() {
let db = TestStageDB::default();
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);
db.commit(|tx| {
for block in 0..=5 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::AccountChangeSets>(block, acc())?;
}
Ok(())
})
.unwrap();
let input = ExecInput { target: Some(5), ..Default::default() };
let mut stage = IndexAccountHistoryStage::default();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true });
provider.commit().unwrap();
let rocksdb = db.factory.rocksdb_provider();
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
assert!(result.is_some());
let blocks: Vec<u64> = result.unwrap().iter().collect();
assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
db.commit(|tx| {
for block in 6..=10 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::AccountChangeSets>(block, acc())?;
}
Ok(())
})
.unwrap();
let input = ExecInput { target: Some(10), checkpoint: Some(StageCheckpoint::new(5)) };
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
provider.commit().unwrap();
let rocksdb = db.factory.rocksdb_provider();
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
assert!(result.is_some(), "RocksDB should have merged data");
let blocks: Vec<u64> = result.unwrap().iter().collect();
assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
}
}
}

View File

@@ -4,13 +4,14 @@ use reth_config::config::EtlConfig;
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
models::{sharded_key::NUM_OF_INDICES_IN_SHARD, AccountBeforeTx, ShardedKey},
table::{Decompress, Table},
table::{Decode, Decompress, Table},
transaction::{DbTx, DbTxMut},
BlockNumberList, DatabaseError,
};
use reth_etl::Collector;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::StaticFileProvider, to_range, BlockReader, DBProvider, ProviderError,
providers::StaticFileProvider, to_range, BlockReader, DBProvider, EitherWriter, ProviderError,
StaticFileProviderFactory,
};
use reth_stages_api::StageError;
@@ -108,7 +109,7 @@ where
for (address, indices) in cache {
insert_fn(address, indices)?
}
Ok::<(), StageError>(())
Ok(())
}
/// Collects account history indices using a provider that implements `ChangeSetReader`.
@@ -124,12 +125,12 @@ where
let mut cache: HashMap<Address, Vec<u64>> = HashMap::default();
let mut insert_fn = |address: Address, indices: Vec<u64>| {
let last = indices.last().expect("qed");
let last = indices.last().expect("indices is non-empty");
collector.insert(
ShardedKey::new(address, *last),
BlockNumberList::new_pre_sorted(indices.into_iter()),
)?;
Ok::<(), StageError>(())
Ok(())
};
// Convert range bounds to concrete range
@@ -320,6 +321,183 @@ impl LoadMode {
}
}
/// Loads account history indices into the database via `EitherWriter`.
///
/// Similar to [`load_history_indices`] but works with [`EitherWriter`] to support
/// both MDBX and `RocksDB` backends.
///
/// ## Process
/// Iterates over elements, grouping indices by their address. It flushes indices to disk
/// when reaching a shard's max length (`NUM_OF_INDICES_IN_SHARD`) or when the address changes,
/// ensuring the last previous address shard is stored.
///
/// Uses `Option<Address>` instead of `Address::default()` as the sentinel to avoid
/// incorrectly treating `Address::ZERO` as "no previous address".
pub(crate) fn load_account_history<N, CURSOR>(
mut collector: Collector<ShardedKey<Address>, BlockNumberList>,
append_only: bool,
writer: &mut EitherWriter<'_, CURSOR, N>,
) -> Result<(), StageError>
where
N: NodePrimitives,
CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
+ DbCursorRO<reth_db_api::tables::AccountsHistory>,
{
let mut current_address: Option<Address> = None;
// Accumulator for block numbers where the current address changed.
let mut current_list = Vec::<u64>::new();
let total_entries = collector.len();
let interval = (total_entries / 10).max(1);
for (index, element) in collector.iter()?.enumerate() {
let (k, v) = element?;
let sharded_key = ShardedKey::<Address>::decode_owned(k)?;
let new_list = BlockNumberList::decompress_owned(v)?;
if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
}
let address = sharded_key.key;
// When address changes, flush the previous address's shards and start fresh.
if current_address != Some(address) {
// Flush all remaining shards for the previous address (uses u64::MAX for last shard).
if let Some(prev_addr) = current_address {
flush_account_history_shards(prev_addr, &mut current_list, append_only, writer)?;
}
current_address = Some(address);
current_list.clear();
// On incremental sync, merge with the existing last shard from the database.
// The last shard is stored with key (address, u64::MAX) so we can find it.
if !append_only &&
let Some(last_shard) = writer.get_last_account_history_shard(address)?
{
current_list.extend(last_shard.iter());
}
}
// Append new block numbers to the accumulator.
current_list.extend(new_list.iter());
// Flush complete shards, keeping the last (partial) shard buffered.
flush_account_history_shards_partial(address, &mut current_list, append_only, writer)?;
}
// Flush the final address's remaining shard.
if let Some(addr) = current_address {
flush_account_history_shards(addr, &mut current_list, append_only, writer)?;
}
Ok(())
}
/// Flushes complete shards for account history, keeping the trailing partial shard buffered.
///
/// Only flushes when we have more than one shard's worth of data, keeping the last
/// (possibly partial) shard for continued accumulation. This avoids writing a shard
/// that may need to be updated when more indices arrive.
///
/// Equivalent to [`load_indices`] with [`LoadMode::KeepLast`].
fn flush_account_history_shards_partial<N, CURSOR>(
address: Address,
list: &mut Vec<u64>,
append_only: bool,
writer: &mut EitherWriter<'_, CURSOR, N>,
) -> Result<(), StageError>
where
N: NodePrimitives,
CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
+ DbCursorRO<reth_db_api::tables::AccountsHistory>,
{
// Nothing to flush if we haven't filled a complete shard yet.
if list.len() <= NUM_OF_INDICES_IN_SHARD {
return Ok(());
}
let num_full_shards = list.len() / NUM_OF_INDICES_IN_SHARD;
// Always keep at least one shard buffered for continued accumulation.
// If len is exact multiple of shard size, keep the last full shard.
let shards_to_flush = if list.len().is_multiple_of(NUM_OF_INDICES_IN_SHARD) {
num_full_shards - 1
} else {
num_full_shards
};
if shards_to_flush == 0 {
return Ok(());
}
// Split: flush the first N shards, keep the remainder buffered.
let flush_len = shards_to_flush * NUM_OF_INDICES_IN_SHARD;
let remainder = list.split_off(flush_len);
// Write each complete shard with its highest block number as the key.
for chunk in list.chunks(NUM_OF_INDICES_IN_SHARD) {
let highest = *chunk.last().expect("chunk is non-empty");
let key = ShardedKey::new(address, highest);
let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
if append_only {
writer.append_account_history(key, &value)?;
} else {
writer.upsert_account_history(key, &value)?;
}
}
// Keep the remaining indices for the next iteration.
*list = remainder;
Ok(())
}
/// Flushes all remaining shards for account history, using `u64::MAX` for the last shard.
///
/// The `u64::MAX` key for the final shard is an invariant that allows `seek_exact(address,
/// u64::MAX)` to find the last shard during incremental sync for merging with new indices.
///
/// Equivalent to [`load_indices`] with [`LoadMode::Flush`].
fn flush_account_history_shards<N, CURSOR>(
address: Address,
list: &mut Vec<u64>,
append_only: bool,
writer: &mut EitherWriter<'_, CURSOR, N>,
) -> Result<(), StageError>
where
N: NodePrimitives,
CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
+ DbCursorRO<reth_db_api::tables::AccountsHistory>,
{
if list.is_empty() {
return Ok(());
}
let num_chunks = list.len().div_ceil(NUM_OF_INDICES_IN_SHARD);
for (i, chunk) in list.chunks(NUM_OF_INDICES_IN_SHARD).enumerate() {
let is_last = i == num_chunks - 1;
// Use u64::MAX for the final shard's key. This invariant allows incremental sync
// to find the last shard via seek_exact(address, u64::MAX) for merging.
let highest = if is_last { u64::MAX } else { *chunk.last().expect("chunk is non-empty") };
let key = ShardedKey::new(address, highest);
let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
if append_only {
writer.append_account_history(key, &value)?;
} else {
writer.upsert_account_history(key, &value)?;
}
}
list.clear();
Ok(())
}
/// Called when database is ahead of static files. Attempts to find the first block we are missing
/// transactions for.
pub(crate) fn missing_static_data_error<Provider>(

View File

@@ -518,8 +518,22 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
where
CURSOR: DbCursorRW<tables::AccountsHistory> + DbCursorRO<tables::AccountsHistory>,
{
/// Puts an account history entry.
pub fn put_account_history(
/// Appends an account history entry (for first sync - more efficient).
pub fn append_account_history(
&mut self,
key: ShardedKey<Address>,
value: &BlockNumberList,
) -> ProviderResult<()> {
match self {
Self::Database(cursor) => Ok(cursor.append(key, value)?),
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.put::<tables::AccountsHistory>(key, value),
}
}
/// Upserts an account history entry (for incremental sync).
pub fn upsert_account_history(
&mut self,
key: ShardedKey<Address>,
value: &BlockNumberList,
@@ -532,6 +546,21 @@ where
}
}
/// Gets the last shard for an address (keyed with `u64::MAX`).
pub fn get_last_account_history_shard(
&mut self,
address: Address,
) -> ProviderResult<Option<BlockNumberList>> {
match self {
Self::Database(cursor) => {
Ok(cursor.seek_exact(ShardedKey::last(address))?.map(|(_, v)| v))
}
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.get::<tables::AccountsHistory>(ShardedKey::last(address)),
}
}
/// Deletes an account history entry.
pub fn delete_account_history(&mut self, key: ShardedKey<Address>) -> ProviderResult<()> {
match self {
@@ -1266,8 +1295,8 @@ mod rocksdb_tests {
for (highest_block, blocks) in shards {
let key = ShardedKey::new(address, *highest_block);
let value = IntegerList::new(blocks.clone()).unwrap();
mdbx_writer.put_account_history(key.clone(), &value).unwrap();
rocks_writer.put_account_history(key, &value).unwrap();
mdbx_writer.upsert_account_history(key.clone(), &value).unwrap();
rocks_writer.upsert_account_history(key, &value).unwrap();
}
// Commit both backends

View File

@@ -2939,25 +2939,33 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvi
.into_iter()
.map(|(index, account)| (account.address, *index))
.collect::<Vec<_>>();
last_indices.sort_by_key(|(a, _)| *a);
last_indices.sort_unstable_by_key(|(a, _)| *a);
// Unwind the account history index.
let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
for &(address, rem_index) in &last_indices {
let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
&mut cursor,
ShardedKey::last(address),
rem_index,
|sharded_key| sharded_key.key == address,
)?;
// Check the last returned partial shard.
// If it's not empty, the shard needs to be reinserted.
if !partial_shard.is_empty() {
cursor.insert(
if self.cached_storage_settings().account_history_in_rocksdb {
#[cfg(all(unix, feature = "rocksdb"))]
{
let batch = self.rocksdb_provider.unwind_account_history_indices(&last_indices)?;
self.pending_rocksdb_batches.lock().push(batch);
}
} else {
// Unwind the account history index in MDBX.
let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
for &(address, rem_index) in &last_indices {
let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
&mut cursor,
ShardedKey::last(address),
&BlockNumberList::new_pre_sorted(partial_shard),
rem_index,
|sharded_key| sharded_key.key == address,
)?;
// Check the last returned partial shard.
// If it's not empty, the shard needs to be reinserted.
if !partial_shard.is_empty() {
cursor.insert(
ShardedKey::last(address),
&BlockNumberList::new_pre_sorted(partial_shard),
)?;
}
}
}

View File

@@ -25,7 +25,7 @@ use rocksdb::{
OptimisticTransactionOptions, Options, Transaction, WriteBatchWithTransaction, WriteOptions,
};
use std::{
collections::BTreeMap,
collections::{BTreeMap, HashMap},
fmt,
path::{Path, PathBuf},
sync::Arc,
@@ -430,6 +430,24 @@ impl RocksDBProvider {
})
}
/// Clears all entries from the specified table.
///
/// Uses `delete_range_cf` from empty key to a max key (256 bytes of 0xFF).
/// This end key must exceed the maximum encoded key size for any table.
/// Current max is ~60 bytes (`StorageShardedKey` = 20 + 32 + 8).
pub fn clear<T: Table>(&self) -> ProviderResult<()> {
let cf = self.get_cf_handle::<T>()?;
self.0.db.delete_range_cf(cf, &[] as &[u8], &[0xFF; 256]).map_err(|e| {
ProviderError::Database(DatabaseError::Delete(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
}))
})?;
Ok(())
}
/// Gets the first (smallest key) entry from the specified table.
pub fn first<T: Table>(&self) -> ProviderResult<Option<(T::Key, T::Value)>> {
self.execute_with_operation_metric(RocksDBOperation::Get, T::NAME, |this| {
@@ -489,6 +507,90 @@ impl RocksDBProvider {
Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData })
}
/// Returns all account history shards for the given address in ascending key order.
///
/// This is used for unwind operations where we need to scan all shards for an address
/// and potentially delete or truncate them.
pub fn account_history_shards(
&self,
address: Address,
) -> ProviderResult<Vec<(ShardedKey<Address>, BlockNumberList)>> {
// Get the column family handle for the AccountsHistory table.
let cf = self.get_cf_handle::<tables::AccountsHistory>()?;
// Build a seek key starting at the first shard (highest_block_number = 0) for this address.
// ShardedKey is (address, highest_block_number) so this positions us at the beginning.
let start_key = ShardedKey::new(address, 0u64);
let start_bytes = start_key.encode();
// Create a forward iterator starting from our seek position.
let iter = self
.0
.db
.iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
let mut result = Vec::new();
for item in iter {
match item {
Ok((key_bytes, value_bytes)) => {
// Decode the sharded key to check if we're still on the same address.
let key = ShardedKey::<Address>::decode(&key_bytes)
.map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
// Stop when we reach a different address (keys are sorted by address first).
if key.key != address {
break;
}
// Decompress the block number list stored in this shard.
let value = BlockNumberList::decompress(&value_bytes)
.map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
result.push((key, value));
}
Err(e) => {
return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
})));
}
}
}
Ok(result)
}
/// Unwinds account history indices for the given `(address, block_number)` pairs.
///
/// Groups addresses by their minimum block number and calls the appropriate unwind
/// operations. For each address, keeps only blocks less than the minimum block
/// (i.e., removes the minimum block and all higher blocks).
///
/// Returns a `WriteBatchWithTransaction` that can be committed later.
pub fn unwind_account_history_indices(
&self,
last_indices: &[(Address, BlockNumber)],
) -> ProviderResult<WriteBatchWithTransaction<true>> {
let mut address_min_block: HashMap<Address, BlockNumber> =
HashMap::with_capacity_and_hasher(last_indices.len(), Default::default());
for &(address, block_number) in last_indices {
address_min_block
.entry(address)
.and_modify(|min| *min = (*min).min(block_number))
.or_insert(block_number);
}
let mut batch = self.batch();
for (address, min_block) in address_min_block {
match min_block.checked_sub(1) {
Some(keep_to) => batch.unwind_account_history_to(address, keep_to)?,
None => batch.clear_account_history(address)?,
}
}
Ok(batch.into_inner())
}
/// Writes a batch of operations atomically.
pub fn write_batch<F>(&self, f: F) -> ProviderResult<()>
where
@@ -719,6 +821,14 @@ impl<'a> RocksDBBatch<'a> {
self.inner
}
/// Gets a value from the database.
///
/// **Important constraint:** This reads only committed state, not pending writes in this
/// batch or other pending batches in `pending_rocksdb_batches`.
pub fn get<T: Table>(&self, key: T::Key) -> ProviderResult<Option<T::Value>> {
self.provider.get::<T>(key)
}
/// Appends indices to an account history shard with proper shard management.
///
/// Loads the existing shard (if any), appends new indices, and rechunks into
@@ -841,6 +951,91 @@ impl<'a> RocksDBBatch<'a> {
Ok(())
}
/// Unwinds account history for the given address, keeping only blocks <= `keep_to`.
///
/// Mirrors MDBX `unwind_history_shards` behavior:
/// - Deletes shards entirely above `keep_to`
/// - Truncates boundary shards and re-keys to `u64::MAX` sentinel
/// - Preserves shards entirely below `keep_to`
pub fn unwind_account_history_to(
&mut self,
address: Address,
keep_to: BlockNumber,
) -> ProviderResult<()> {
let shards = self.provider.account_history_shards(address)?;
if shards.is_empty() {
return Ok(());
}
// Find the first shard that might contain blocks > keep_to.
// A shard is affected if it's the sentinel (u64::MAX) or its highest_block_number > keep_to
let boundary_idx = shards.iter().position(|(key, _)| {
key.highest_block_number == u64::MAX || key.highest_block_number > keep_to
});
// Repair path: no shards affected means all blocks <= keep_to, just ensure sentinel exists
let Some(boundary_idx) = boundary_idx else {
let (last_key, last_value) = shards.last().expect("shards is non-empty");
if last_key.highest_block_number != u64::MAX {
self.delete::<tables::AccountsHistory>(last_key.clone())?;
self.put::<tables::AccountsHistory>(
ShardedKey::new(address, u64::MAX),
last_value,
)?;
}
return Ok(());
};
// Delete all shards strictly after the boundary (they are entirely > keep_to)
for (key, _) in shards.iter().skip(boundary_idx + 1) {
self.delete::<tables::AccountsHistory>(key.clone())?;
}
// Process the boundary shard: filter out blocks > keep_to
let (boundary_key, boundary_list) = &shards[boundary_idx];
// Delete the boundary shard (we'll either drop it or rewrite at u64::MAX)
self.delete::<tables::AccountsHistory>(boundary_key.clone())?;
// Build truncated list once; check emptiness directly (avoids double iteration)
let new_last =
BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
if new_last.is_empty() {
// Boundary shard is now empty. Previous shard becomes the last and must be keyed
// u64::MAX.
if boundary_idx == 0 {
// Nothing left for this address
return Ok(());
}
let (prev_key, prev_value) = &shards[boundary_idx - 1];
if prev_key.highest_block_number != u64::MAX {
self.delete::<tables::AccountsHistory>(prev_key.clone())?;
self.put::<tables::AccountsHistory>(
ShardedKey::new(address, u64::MAX),
prev_value,
)?;
}
return Ok(());
}
self.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &new_last)?;
Ok(())
}
/// Clears all account history shards for the given address.
///
/// Used when unwinding from block 0 (i.e., removing all history).
pub fn clear_account_history(&mut self, address: Address) -> ProviderResult<()> {
let shards = self.provider.account_history_shards(address)?;
for (key, _) in shards {
self.delete::<tables::AccountsHistory>(key)?;
}
Ok(())
}
}
/// `RocksDB` transaction wrapper providing MDBX-like semantics.
@@ -1720,4 +1915,318 @@ mod tests {
"sentinel shard should exist"
);
}
#[test]
fn test_clear_table() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x42; 20]);
let key = ShardedKey::new(address, u64::MAX);
let blocks = BlockNumberList::new_pre_sorted([1, 2, 3]);
provider.put::<tables::AccountsHistory>(key.clone(), &blocks).unwrap();
assert!(provider.get::<tables::AccountsHistory>(key.clone()).unwrap().is_some());
provider.clear::<tables::AccountsHistory>().unwrap();
assert!(
provider.get::<tables::AccountsHistory>(key).unwrap().is_none(),
"table should be empty after clear"
);
assert!(
provider.first::<tables::AccountsHistory>().unwrap().is_none(),
"first() should return None after clear"
);
}
#[test]
fn test_clear_empty_table() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
provider.clear::<tables::AccountsHistory>().unwrap();
assert!(provider.first::<tables::AccountsHistory>().unwrap().is_none());
}
#[test]
fn test_unwind_account_history_to_basic() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x42; 20]);
// Add blocks 0-10
let mut batch = provider.batch();
batch.append_account_history_shard(address, 0..=10).unwrap();
batch.commit().unwrap();
// Verify we have blocks 0-10
let key = ShardedKey::new(address, u64::MAX);
let result = provider.get::<tables::AccountsHistory>(key.clone()).unwrap();
assert!(result.is_some());
let blocks: Vec<u64> = result.unwrap().iter().collect();
assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
// Unwind to block 5 (keep blocks 0-5, remove 6-10)
let mut batch = provider.batch();
batch.unwind_account_history_to(address, 5).unwrap();
batch.commit().unwrap();
// Verify only blocks 0-5 remain
let result = provider.get::<tables::AccountsHistory>(key).unwrap();
assert!(result.is_some());
let blocks: Vec<u64> = result.unwrap().iter().collect();
assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
}
#[test]
fn test_unwind_account_history_to_removes_all() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x42; 20]);
// Add blocks 5-10
let mut batch = provider.batch();
batch.append_account_history_shard(address, 5..=10).unwrap();
batch.commit().unwrap();
// Unwind to block 4 (removes all blocks since they're all > 4)
let mut batch = provider.batch();
batch.unwind_account_history_to(address, 4).unwrap();
batch.commit().unwrap();
// Verify no data remains for this address
let key = ShardedKey::new(address, u64::MAX);
let result = provider.get::<tables::AccountsHistory>(key).unwrap();
assert!(result.is_none(), "Should have no data after full unwind");
}
#[test]
fn test_unwind_account_history_to_no_op() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x42; 20]);
// Add blocks 0-5
let mut batch = provider.batch();
batch.append_account_history_shard(address, 0..=5).unwrap();
batch.commit().unwrap();
// Unwind to block 10 (no-op since all blocks are <= 10)
let mut batch = provider.batch();
batch.unwind_account_history_to(address, 10).unwrap();
batch.commit().unwrap();
// Verify blocks 0-5 still remain
let key = ShardedKey::new(address, u64::MAX);
let result = provider.get::<tables::AccountsHistory>(key).unwrap();
assert!(result.is_some());
let blocks: Vec<u64> = result.unwrap().iter().collect();
assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
}
#[test]
fn test_unwind_account_history_to_block_zero() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x42; 20]);
// Add blocks 0-5 (including block 0)
let mut batch = provider.batch();
batch.append_account_history_shard(address, 0..=5).unwrap();
batch.commit().unwrap();
// Unwind to block 0 (keep only block 0, remove 1-5)
// This simulates the caller doing: unwind_to = min_block.checked_sub(1) where min_block = 1
let mut batch = provider.batch();
batch.unwind_account_history_to(address, 0).unwrap();
batch.commit().unwrap();
// Verify only block 0 remains
let key = ShardedKey::new(address, u64::MAX);
let result = provider.get::<tables::AccountsHistory>(key).unwrap();
assert!(result.is_some());
let blocks: Vec<u64> = result.unwrap().iter().collect();
assert_eq!(blocks, vec![0]);
}
#[test]
fn test_unwind_account_history_to_multi_shard() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x42; 20]);
// Create multiple shards by adding more than NUM_OF_INDICES_IN_SHARD entries
// For testing, we'll manually create shards with specific keys
let mut batch = provider.batch();
// First shard: blocks 1-50, keyed by 50
let shard1 = BlockNumberList::new_pre_sorted(1..=50);
batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
// Second shard: blocks 51-100, keyed by MAX (sentinel)
let shard2 = BlockNumberList::new_pre_sorted(51..=100);
batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
batch.commit().unwrap();
// Verify we have 2 shards
let shards = provider.account_history_shards(address).unwrap();
assert_eq!(shards.len(), 2);
// Unwind to block 75 (keep 1-75, remove 76-100)
let mut batch = provider.batch();
batch.unwind_account_history_to(address, 75).unwrap();
batch.commit().unwrap();
// Verify: shard1 should be untouched, shard2 should be truncated
let shards = provider.account_history_shards(address).unwrap();
assert_eq!(shards.len(), 2);
// First shard unchanged
assert_eq!(shards[0].0.highest_block_number, 50);
assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
// Second shard truncated and re-keyed to MAX
assert_eq!(shards[1].0.highest_block_number, u64::MAX);
assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
}
#[test]
fn test_unwind_account_history_to_multi_shard_boundary_empty() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x42; 20]);
// Create two shards
let mut batch = provider.batch();
// First shard: blocks 1-50, keyed by 50
let shard1 = BlockNumberList::new_pre_sorted(1..=50);
batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
// Second shard: blocks 75-100, keyed by MAX
let shard2 = BlockNumberList::new_pre_sorted(75..=100);
batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard2).unwrap();
batch.commit().unwrap();
// Unwind to block 60 (removes all of shard2 since 75 > 60, promotes shard1 to MAX)
let mut batch = provider.batch();
batch.unwind_account_history_to(address, 60).unwrap();
batch.commit().unwrap();
// Verify: only shard1 remains, now keyed as MAX
let shards = provider.account_history_shards(address).unwrap();
assert_eq!(shards.len(), 1);
assert_eq!(shards[0].0.highest_block_number, u64::MAX);
assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
}
#[test]
fn test_account_history_shards_iterator() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x42; 20]);
let other_address = Address::from([0x43; 20]);
// Add data for two addresses
let mut batch = provider.batch();
batch.append_account_history_shard(address, 0..=5).unwrap();
batch.append_account_history_shard(other_address, 10..=15).unwrap();
batch.commit().unwrap();
// Query shards for first address only
let shards = provider.account_history_shards(address).unwrap();
assert_eq!(shards.len(), 1);
assert_eq!(shards[0].0.key, address);
// Query shards for second address only
let shards = provider.account_history_shards(other_address).unwrap();
assert_eq!(shards.len(), 1);
assert_eq!(shards[0].0.key, other_address);
// Query shards for non-existent address
let non_existent = Address::from([0x99; 20]);
let shards = provider.account_history_shards(non_existent).unwrap();
assert!(shards.is_empty());
}
#[test]
fn test_clear_account_history() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x42; 20]);
// Add blocks 0-10
let mut batch = provider.batch();
batch.append_account_history_shard(address, 0..=10).unwrap();
batch.commit().unwrap();
// Clear all history (simulates unwind from block 0)
let mut batch = provider.batch();
batch.clear_account_history(address).unwrap();
batch.commit().unwrap();
// Verify no data remains
let shards = provider.account_history_shards(address).unwrap();
assert!(shards.is_empty(), "All shards should be deleted");
}
#[test]
fn test_unwind_non_sentinel_boundary() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();
let address = Address::from([0x42; 20]);
// Create three shards with non-sentinel boundary
let mut batch = provider.batch();
// Shard 1: blocks 1-50, keyed by 50
let shard1 = BlockNumberList::new_pre_sorted(1..=50);
batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 50), &shard1).unwrap();
// Shard 2: blocks 51-100, keyed by 100 (non-sentinel, will be boundary)
let shard2 = BlockNumberList::new_pre_sorted(51..=100);
batch.put::<tables::AccountsHistory>(ShardedKey::new(address, 100), &shard2).unwrap();
// Shard 3: blocks 101-150, keyed by MAX (will be deleted)
let shard3 = BlockNumberList::new_pre_sorted(101..=150);
batch.put::<tables::AccountsHistory>(ShardedKey::new(address, u64::MAX), &shard3).unwrap();
batch.commit().unwrap();
// Verify 3 shards
let shards = provider.account_history_shards(address).unwrap();
assert_eq!(shards.len(), 3);
// Unwind to block 75 (truncates shard2, deletes shard3)
let mut batch = provider.batch();
batch.unwind_account_history_to(address, 75).unwrap();
batch.commit().unwrap();
// Verify: shard1 unchanged, shard2 truncated and re-keyed to MAX, shard3 deleted
let shards = provider.account_history_shards(address).unwrap();
assert_eq!(shards.len(), 2);
// First shard unchanged
assert_eq!(shards[0].0.highest_block_number, 50);
assert_eq!(shards[0].1.iter().collect::<Vec<_>>(), (1..=50).collect::<Vec<_>>());
// Second shard truncated and re-keyed to MAX
assert_eq!(shards[1].0.highest_block_number, u64::MAX);
assert_eq!(shards[1].1.iter().collect::<Vec<_>>(), (51..=75).collect::<Vec<_>>());
}
}

View File

@@ -1,4 +1,7 @@
use crate::{either_writer::RocksTxRefArg, providers::RocksDBProvider};
use crate::{
either_writer::{RawRocksDBBatch, RocksBatchArg, RocksTxRefArg},
providers::RocksDBProvider,
};
use reth_storage_errors::provider::ProviderResult;
/// `RocksDB` provider factory.
@@ -31,4 +34,28 @@ pub trait RocksDBProviderFactory {
#[cfg(not(all(unix, feature = "rocksdb")))]
f(())
}
/// Executes a closure with a `RocksDB` batch, automatically registering it for commit.
///
/// This helper encapsulates all the cfg-gated `RocksDB` batch handling.
fn with_rocksdb_batch<F, R>(&self, f: F) -> ProviderResult<R>
where
F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
{
#[cfg(all(unix, feature = "rocksdb"))]
{
let rocksdb = self.rocksdb_provider();
let batch = rocksdb.batch();
let (result, raw_batch) = f(batch)?;
if let Some(b) = raw_batch {
self.set_pending_rocksdb_batch(b);
}
Ok(result)
}
#[cfg(not(all(unix, feature = "rocksdb")))]
{
let (result, _) = f(())?;
Ok(result)
}
}
}