feat(stages): add RocksDB support for IndexStorageHistoryStage (#21175)

This commit is contained in:
YK
2026-01-21 18:05:19 +01:00
committed by GitHub
parent dd72cfe23e
commit 624ddc5779
5 changed files with 770 additions and 208 deletions

View File

@@ -1,19 +1,21 @@
use super::{collect_history_indices, load_history_indices};
use crate::{StageCheckpoint, StageId};
use super::collect_history_indices;
use crate::{stages::utils::load_storage_history, StageCheckpoint, StageId};
use reth_config::config::{EtlConfig, IndexHistoryConfig};
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, AddressStorageKey, BlockNumberAddress},
table::Decode,
tables,
transaction::DbTxMut,
};
use reth_provider::{DBProvider, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter};
use reth_provider::{
DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
RocksDBProviderFactory, StorageSettingsCache,
};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
use reth_stages_api::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use std::fmt::Debug;
use tracing::info;
/// Stage is indexing history the account changesets generated in
/// Stage is indexing history the storage changesets generated in
/// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information
/// on index sharding take a look at [`tables::StoragesHistory`].
#[derive(Debug)]
@@ -34,7 +36,7 @@ impl IndexStorageHistoryStage {
etl_config: EtlConfig,
prune_mode: Option<PruneMode>,
) -> Self {
Self { commit_threshold: config.commit_threshold, prune_mode, etl_config }
Self { commit_threshold: config.commit_threshold, etl_config, prune_mode }
}
}
@@ -46,8 +48,13 @@ impl Default for IndexStorageHistoryStage {
impl<Provider> Stage<Provider> for IndexStorageHistoryStage
where
Provider:
DBProvider<Tx: DbTxMut> + PruneCheckpointWriter + HistoryWriter + PruneCheckpointReader,
Provider: DBProvider<Tx: DbTxMut>
+ HistoryWriter
+ PruneCheckpointReader
+ PruneCheckpointWriter
+ StorageSettingsCache
+ RocksDBProviderFactory
+ reth_provider::NodePrimitivesProvider,
{
/// Return the id of the stage
fn id(&self) -> StageId {
@@ -95,15 +102,25 @@ where
let mut range = input.next_block_range();
let first_sync = input.checkpoint().block_number == 0;
let use_rocksdb = provider.cached_storage_settings().storages_history_in_rocksdb;
// On first sync we might have history coming from genesis. We clear the table since it's
// faster to rebuild from scratch.
if first_sync {
provider.tx_ref().clear::<tables::StoragesHistory>()?;
if use_rocksdb {
// Note: RocksDB clear() executes immediately (not deferred to commit like MDBX),
// but this is safe for first_sync because if we crash before commit, the
// checkpoint stays at 0 and we'll just clear and rebuild again on restart. The
// source data (changesets) is intact.
#[cfg(all(unix, feature = "rocksdb"))]
provider.rocksdb_provider().clear::<tables::StoragesHistory>()?;
} else {
provider.tx_ref().clear::<tables::StoragesHistory>()?;
}
range = 0..=*input.next_block_range().end();
}
info!(target: "sync::stages::index_storage_history::exec", ?first_sync, "Collecting indices");
info!(target: "sync::stages::index_storage_history::exec", ?first_sync, ?use_rocksdb, "Collecting indices");
let collector =
collect_history_indices::<_, tables::StorageChangeSets, tables::StoragesHistory, _>(
provider,
@@ -116,16 +133,13 @@ where
)?;
info!(target: "sync::stages::index_storage_history::exec", "Loading indices into database");
load_history_indices::<_, tables::StoragesHistory, _>(
provider,
collector,
first_sync,
|AddressStorageKey((address, storage_key)), highest_block_number| {
StorageShardedKey::new(address, storage_key, highest_block_number)
},
StorageShardedKey::decode_owned,
|key| AddressStorageKey((key.address, key.sharded_key.key)),
)?;
provider.with_rocksdb_batch(|rocksdb_batch| {
let mut writer = EitherWriter::new_storages_history(provider, rocksdb_batch)?;
load_storage_history(collector, first_sync, &mut writer)
.map_err(|e| reth_provider::ProviderError::other(Box::new(e)))?;
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true })
}
@@ -382,12 +396,12 @@ mod tests {
async fn insert_index_second_half_shard() {
// init
let db = TestStageDB::default();
let mut close_full_list = (1..=LAST_BLOCK_IN_FULL_SHARD - 1).collect::<Vec<_>>();
let mut almost_full_list = (1..=LAST_BLOCK_IN_FULL_SHARD - 1).collect::<Vec<_>>();
// setup
partial_setup(&db);
db.commit(|tx| {
tx.put::<tables::StoragesHistory>(shard(u64::MAX), list(&close_full_list)).unwrap();
tx.put::<tables::StoragesHistory>(shard(u64::MAX), list(&almost_full_list)).unwrap();
Ok(())
})
.unwrap();
@@ -396,12 +410,12 @@ mod tests {
run(&db, LAST_BLOCK_IN_FULL_SHARD + 1, Some(LAST_BLOCK_IN_FULL_SHARD - 1));
// verify
close_full_list.push(LAST_BLOCK_IN_FULL_SHARD);
almost_full_list.push(LAST_BLOCK_IN_FULL_SHARD);
let table = cast(db.table::<tables::StoragesHistory>().unwrap());
assert_eq!(
table,
BTreeMap::from([
(shard(LAST_BLOCK_IN_FULL_SHARD), close_full_list.clone()),
(shard(LAST_BLOCK_IN_FULL_SHARD), almost_full_list.clone()),
(shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1])
])
);
@@ -410,9 +424,9 @@ mod tests {
unwind(&db, LAST_BLOCK_IN_FULL_SHARD, LAST_BLOCK_IN_FULL_SHARD - 1);
// verify initial state
close_full_list.pop();
almost_full_list.pop();
let table = cast(db.table::<tables::StoragesHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), close_full_list)]));
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), almost_full_list)]));
}
#[tokio::test]
@@ -663,4 +677,294 @@ mod tests {
Ok(())
}
}
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb_tests {
use super::*;
use reth_provider::RocksDBProviderFactory;
use reth_storage_api::StorageSettings;
/// Test that when `storages_history_in_rocksdb` is enabled, the stage
/// writes storage history indices to `RocksDB` instead of MDBX.
#[tokio::test]
async fn execute_writes_to_rocksdb_when_enabled() {
let db = TestStageDB::default();
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
db.commit(|tx| {
for block in 0..=10 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::StorageChangeSets>(
block_number_address(block),
storage(STORAGE_KEY),
)?;
}
Ok(())
})
.unwrap();
let input = ExecInput { target: Some(10), ..Default::default() };
let mut stage = IndexStorageHistoryStage::default();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
provider.commit().unwrap();
let mdbx_table = db.table::<tables::StoragesHistory>().unwrap();
assert!(
mdbx_table.is_empty(),
"MDBX StoragesHistory should be empty when RocksDB is enabled"
);
let rocksdb = db.factory.rocksdb_provider();
let result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
assert!(result.is_some(), "RocksDB should contain storage history");
let block_list = result.unwrap();
let blocks: Vec<u64> = block_list.iter().collect();
assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
}
/// Test that unwind works correctly when `storages_history_in_rocksdb` is enabled.
#[tokio::test]
async fn unwind_works_when_rocksdb_enabled() {
let db = TestStageDB::default();
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
db.commit(|tx| {
for block in 0..=10 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::StorageChangeSets>(
block_number_address(block),
storage(STORAGE_KEY),
)?;
}
Ok(())
})
.unwrap();
let input = ExecInput { target: Some(10), ..Default::default() };
let mut stage = IndexStorageHistoryStage::default();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
provider.commit().unwrap();
let rocksdb = db.factory.rocksdb_provider();
let result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
assert!(result.is_some(), "RocksDB should have data before unwind");
let blocks_before: Vec<u64> = result.unwrap().iter().collect();
assert_eq!(blocks_before, (0..=10).collect::<Vec<_>>());
let unwind_input =
UnwindInput { checkpoint: StageCheckpoint::new(10), unwind_to: 5, bad_block: None };
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.unwind(&provider, unwind_input).unwrap();
assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(5) });
provider.commit().unwrap();
let rocksdb = db.factory.rocksdb_provider();
let result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
assert!(result.is_some(), "RocksDB should still have data after partial unwind");
let blocks_after: Vec<u64> = result.unwrap().iter().collect();
assert_eq!(
blocks_after,
(0..=5).collect::<Vec<_>>(),
"Should only have blocks 0-5 after unwind to block 5"
);
}
/// Test that unwind to block 0 keeps only block 0's history.
#[tokio::test]
async fn unwind_to_zero_keeps_block_zero() {
let db = TestStageDB::default();
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
db.commit(|tx| {
for block in 0..=5 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::StorageChangeSets>(
block_number_address(block),
storage(STORAGE_KEY),
)?;
}
Ok(())
})
.unwrap();
let input = ExecInput { target: Some(5), ..Default::default() };
let mut stage = IndexStorageHistoryStage::default();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true });
provider.commit().unwrap();
let rocksdb = db.factory.rocksdb_provider();
let result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
assert!(result.is_some(), "RocksDB should have data before unwind");
let unwind_input =
UnwindInput { checkpoint: StageCheckpoint::new(5), unwind_to: 0, bad_block: None };
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.unwind(&provider, unwind_input).unwrap();
assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(0) });
provider.commit().unwrap();
let rocksdb = db.factory.rocksdb_provider();
let result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
assert!(result.is_some(), "RocksDB should still have block 0 history");
let blocks_after: Vec<u64> = result.unwrap().iter().collect();
assert_eq!(blocks_after, vec![0], "Should only have block 0 after unwinding to 0");
}
/// Test incremental sync merges new data with existing shards.
#[tokio::test]
async fn execute_incremental_sync() {
let db = TestStageDB::default();
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
db.commit(|tx| {
for block in 0..=5 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::StorageChangeSets>(
block_number_address(block),
storage(STORAGE_KEY),
)?;
}
Ok(())
})
.unwrap();
let input = ExecInput { target: Some(5), ..Default::default() };
let mut stage = IndexStorageHistoryStage::default();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true });
provider.commit().unwrap();
let rocksdb = db.factory.rocksdb_provider();
let result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
assert!(result.is_some());
let blocks: Vec<u64> = result.unwrap().iter().collect();
assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
db.commit(|tx| {
for block in 6..=10 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::StorageChangeSets>(
block_number_address(block),
storage(STORAGE_KEY),
)?;
}
Ok(())
})
.unwrap();
let input = ExecInput { target: Some(10), checkpoint: Some(StageCheckpoint::new(5)) };
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
provider.commit().unwrap();
let rocksdb = db.factory.rocksdb_provider();
let result = rocksdb.get::<tables::StoragesHistory>(shard(u64::MAX)).unwrap();
assert!(result.is_some(), "RocksDB should have merged data");
let blocks: Vec<u64> = result.unwrap().iter().collect();
assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
}
/// Test multi-shard unwind correctly handles shards that span across unwind boundary.
#[tokio::test]
async fn unwind_multi_shard() {
use reth_db_api::models::sharded_key::NUM_OF_INDICES_IN_SHARD;
let db = TestStageDB::default();
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);
let num_blocks = (NUM_OF_INDICES_IN_SHARD * 2 + 100) as u64;
db.commit(|tx| {
for block in 0..num_blocks {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::StorageChangeSets>(
block_number_address(block),
storage(STORAGE_KEY),
)?;
}
Ok(())
})
.unwrap();
let input = ExecInput { target: Some(num_blocks - 1), ..Default::default() };
let mut stage = IndexStorageHistoryStage::default();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(
out,
ExecOutput { checkpoint: StageCheckpoint::new(num_blocks - 1), done: true }
);
provider.commit().unwrap();
let rocksdb = db.factory.rocksdb_provider();
let shards = rocksdb.storage_history_shards(ADDRESS, STORAGE_KEY).unwrap();
assert!(shards.len() >= 2, "Should have at least 2 shards for {} blocks", num_blocks);
let unwind_to = NUM_OF_INDICES_IN_SHARD as u64 + 50;
let unwind_input = UnwindInput {
checkpoint: StageCheckpoint::new(num_blocks - 1),
unwind_to,
bad_block: None,
};
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.unwind(&provider, unwind_input).unwrap();
assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) });
provider.commit().unwrap();
let rocksdb = db.factory.rocksdb_provider();
let shards_after = rocksdb.storage_history_shards(ADDRESS, STORAGE_KEY).unwrap();
assert!(!shards_after.is_empty(), "Should still have shards after unwind");
let all_blocks: Vec<u64> =
shards_after.iter().flat_map(|(_, list)| list.iter()).collect();
assert_eq!(
all_blocks,
(0..=unwind_to).collect::<Vec<_>>(),
"Should only have blocks 0 to {} after unwind",
unwind_to
);
}
}
}

View File

@@ -1,12 +1,15 @@
//! Utils for `stages`.
use alloy_primitives::{Address, BlockNumber, TxNumber};
use alloy_primitives::{Address, BlockNumber, TxNumber, B256};
use reth_config::config::EtlConfig;
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
models::{sharded_key::NUM_OF_INDICES_IN_SHARD, AccountBeforeTx, ShardedKey},
models::{
sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey,
AccountBeforeTx, ShardedKey,
},
table::{Decode, Decompress, Table},
transaction::{DbTx, DbTxMut},
BlockNumberList, DatabaseError,
transaction::DbTx,
BlockNumberList,
};
use reth_etl::Collector;
use reth_primitives_traits::NodePrimitives;
@@ -171,164 +174,9 @@ where
Ok(collector)
}
/// Given a [`Collector`] created by [`collect_history_indices`] it iterates all entries, loading
/// the indices into the database in shards.
///
/// ## Process
/// Iterates over elements, grouping indices by their partial keys (e.g., `Address` or
/// `Address.StorageKey`). It flushes indices to disk when reaching a shard's max length
/// (`NUM_OF_INDICES_IN_SHARD`) or when the partial key changes, ensuring the last previous partial
/// key shard is stored.
pub(crate) fn load_history_indices<Provider, H, P>(
provider: &Provider,
mut collector: Collector<H::Key, H::Value>,
append_only: bool,
sharded_key_factory: impl Clone + Fn(P, u64) -> <H as Table>::Key,
decode_key: impl Fn(Vec<u8>) -> Result<<H as Table>::Key, DatabaseError>,
get_partial: impl Fn(<H as Table>::Key) -> P,
) -> Result<(), StageError>
where
Provider: DBProvider<Tx: DbTxMut>,
H: Table<Value = BlockNumberList>,
P: Copy + Default + Eq,
{
let mut write_cursor = provider.tx_ref().cursor_write::<H>()?;
let mut current_partial = None;
let mut current_list = Vec::<u64>::new();
// observability
let total_entries = collector.len();
let interval = (total_entries / 10).max(1);
for (index, element) in collector.iter()?.enumerate() {
let (k, v) = element?;
let sharded_key = decode_key(k)?;
let new_list = BlockNumberList::decompress_owned(v)?;
if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
}
// AccountsHistory: `Address`.
// StorageHistory: `Address.StorageKey`.
let partial_key = get_partial(sharded_key);
if current_partial != Some(partial_key) {
// We have reached the end of this subset of keys so
// we need to flush its last indice shard.
if let Some(current) = current_partial {
load_indices(
&mut write_cursor,
current,
&mut current_list,
&sharded_key_factory,
append_only,
LoadMode::Flush,
)?;
}
current_partial = Some(partial_key);
current_list.clear();
// If it's not the first sync, there might an existing shard already, so we need to
// merge it with the one coming from the collector
if !append_only &&
let Some((_, last_database_shard)) =
write_cursor.seek_exact(sharded_key_factory(partial_key, u64::MAX))?
{
current_list.extend(last_database_shard.iter());
}
}
current_list.extend(new_list.iter());
load_indices(
&mut write_cursor,
partial_key,
&mut current_list,
&sharded_key_factory,
append_only,
LoadMode::KeepLast,
)?;
}
// There will be one remaining shard that needs to be flushed to DB.
if let Some(current) = current_partial {
load_indices(
&mut write_cursor,
current,
&mut current_list,
&sharded_key_factory,
append_only,
LoadMode::Flush,
)?;
}
Ok(())
}
/// Shard and insert the indices list according to [`LoadMode`] and its length.
pub(crate) fn load_indices<H, C, P>(
cursor: &mut C,
partial_key: P,
list: &mut Vec<BlockNumber>,
sharded_key_factory: &impl Fn(P, BlockNumber) -> <H as Table>::Key,
append_only: bool,
mode: LoadMode,
) -> Result<(), StageError>
where
C: DbCursorRO<H> + DbCursorRW<H>,
H: Table<Value = BlockNumberList>,
P: Copy,
{
if list.len() > NUM_OF_INDICES_IN_SHARD || mode.is_flush() {
let chunks = list
.chunks(NUM_OF_INDICES_IN_SHARD)
.map(|chunks| chunks.to_vec())
.collect::<Vec<Vec<u64>>>();
let mut iter = chunks.into_iter().peekable();
while let Some(chunk) = iter.next() {
let mut highest = *chunk.last().expect("at least one index");
if !mode.is_flush() && iter.peek().is_none() {
*list = chunk;
} else {
if iter.peek().is_none() {
highest = u64::MAX;
}
let key = sharded_key_factory(partial_key, highest);
let value = BlockNumberList::new_pre_sorted(chunk);
if append_only {
cursor.append(key, &value)?;
} else {
cursor.upsert(key, &value)?;
}
}
}
}
Ok(())
}
/// Mode on how to load index shards into the database.
pub(crate) enum LoadMode {
/// Keep the last shard in memory and don't flush it to the database.
KeepLast,
/// Flush all shards into the database.
Flush,
}
impl LoadMode {
const fn is_flush(&self) -> bool {
matches!(self, Self::Flush)
}
}
/// Loads account history indices into the database via `EitherWriter`.
///
/// Similar to [`load_history_indices`] but works with [`EitherWriter`] to support
/// both MDBX and `RocksDB` backends.
/// Works with [`EitherWriter`] to support both MDBX and `RocksDB` backends.
///
/// ## Process
/// Iterates over elements, grouping indices by their address. It flushes indices to disk
@@ -404,8 +252,6 @@ where
/// Only flushes when we have more than one shard's worth of data, keeping the last
/// (possibly partial) shard for continued accumulation. This avoids writing a shard
/// that may need to be updated when more indices arrive.
///
/// Equivalent to [`load_indices`] with [`LoadMode::KeepLast`].
fn flush_account_history_shards_partial<N, CURSOR>(
address: Address,
list: &mut Vec<u64>,
@@ -462,8 +308,6 @@ where
///
/// The `u64::MAX` key for the final shard is an invariant that allows `seek_exact(address,
/// u64::MAX)` to find the last shard during incremental sync for merging with new indices.
///
/// Equivalent to [`load_indices`] with [`LoadMode::Flush`].
fn flush_account_history_shards<N, CURSOR>(
address: Address,
list: &mut Vec<u64>,
@@ -537,3 +381,191 @@ where
segment,
})
}
/// Loads storage history indices into the database via `EitherWriter`.
///
/// Works with [`EitherWriter`] to support both MDBX and `RocksDB` backends.
///
/// ## Process
/// Iterates over elements, grouping indices by their (address, `storage_key`) pairs. It flushes
/// indices to disk when reaching a shard's max length (`NUM_OF_INDICES_IN_SHARD`) or when the
/// (address, `storage_key`) pair changes, ensuring the last previous shard is stored.
///
/// Uses `Option<(Address, B256)>` instead of default values as the sentinel to avoid
/// incorrectly treating `(Address::ZERO, B256::ZERO)` as "no previous key".
pub(crate) fn load_storage_history<N, CURSOR>(
mut collector: Collector<StorageShardedKey, BlockNumberList>,
append_only: bool,
writer: &mut EitherWriter<'_, CURSOR, N>,
) -> Result<(), StageError>
where
N: NodePrimitives,
CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
+ DbCursorRO<reth_db_api::tables::StoragesHistory>,
{
let mut current_key: Option<(Address, B256)> = None;
// Accumulator for block numbers where the current (address, storage_key) changed.
let mut current_list = Vec::<u64>::new();
let total_entries = collector.len();
let interval = (total_entries / 10).max(1);
for (index, element) in collector.iter()?.enumerate() {
let (k, v) = element?;
let sharded_key = StorageShardedKey::decode_owned(k)?;
let new_list = BlockNumberList::decompress_owned(v)?;
if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
}
let partial_key = (sharded_key.address, sharded_key.sharded_key.key);
// When (address, storage_key) changes, flush the previous key's shards and start fresh.
if current_key != Some(partial_key) {
// Flush all remaining shards for the previous key (uses u64::MAX for last shard).
if let Some((prev_addr, prev_storage_key)) = current_key {
flush_storage_history_shards(
prev_addr,
prev_storage_key,
&mut current_list,
append_only,
writer,
)?;
}
current_key = Some(partial_key);
current_list.clear();
// On incremental sync, merge with the existing last shard from the database.
// The last shard is stored with key (address, storage_key, u64::MAX) so we can find it.
if !append_only &&
let Some(last_shard) =
writer.get_last_storage_history_shard(partial_key.0, partial_key.1)?
{
current_list.extend(last_shard.iter());
}
}
// Append new block numbers to the accumulator.
current_list.extend(new_list.iter());
// Flush complete shards, keeping the last (partial) shard buffered.
flush_storage_history_shards_partial(
partial_key.0,
partial_key.1,
&mut current_list,
append_only,
writer,
)?;
}
// Flush the final key's remaining shard.
if let Some((addr, storage_key)) = current_key {
flush_storage_history_shards(addr, storage_key, &mut current_list, append_only, writer)?;
}
Ok(())
}
/// Flushes complete shards for storage history, keeping the trailing partial shard buffered.
///
/// Only flushes when we have more than one shard's worth of data, keeping the last
/// (possibly partial) shard for continued accumulation. This avoids writing a shard
/// that may need to be updated when more indices arrive.
fn flush_storage_history_shards_partial<N, CURSOR>(
address: Address,
storage_key: B256,
list: &mut Vec<u64>,
append_only: bool,
writer: &mut EitherWriter<'_, CURSOR, N>,
) -> Result<(), StageError>
where
N: NodePrimitives,
CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
+ DbCursorRO<reth_db_api::tables::StoragesHistory>,
{
// Nothing to flush if we haven't filled a complete shard yet.
if list.len() <= NUM_OF_INDICES_IN_SHARD {
return Ok(());
}
let num_full_shards = list.len() / NUM_OF_INDICES_IN_SHARD;
// Always keep at least one shard buffered for continued accumulation.
// If len is exact multiple of shard size, keep the last full shard.
let shards_to_flush = if list.len().is_multiple_of(NUM_OF_INDICES_IN_SHARD) {
num_full_shards - 1
} else {
num_full_shards
};
if shards_to_flush == 0 {
return Ok(());
}
// Split: flush the first N shards, keep the remainder buffered.
let flush_len = shards_to_flush * NUM_OF_INDICES_IN_SHARD;
let remainder = list.split_off(flush_len);
// Write each complete shard with its highest block number as the key.
for chunk in list.chunks(NUM_OF_INDICES_IN_SHARD) {
let highest = *chunk.last().expect("chunk is non-empty");
let key = StorageShardedKey::new(address, storage_key, highest);
let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
if append_only {
writer.append_storage_history(key, &value)?;
} else {
writer.upsert_storage_history(key, &value)?;
}
}
// Keep the remaining indices for the next iteration.
*list = remainder;
Ok(())
}
/// Flushes all remaining shards for storage history, using `u64::MAX` for the last shard.
///
/// The `u64::MAX` key for the final shard is an invariant that allows
/// `seek_exact(address, storage_key, u64::MAX)` to find the last shard during incremental
/// sync for merging with new indices.
fn flush_storage_history_shards<N, CURSOR>(
address: Address,
storage_key: B256,
list: &mut Vec<u64>,
append_only: bool,
writer: &mut EitherWriter<'_, CURSOR, N>,
) -> Result<(), StageError>
where
N: NodePrimitives,
CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
+ DbCursorRO<reth_db_api::tables::StoragesHistory>,
{
if list.is_empty() {
return Ok(());
}
let num_chunks = list.len().div_ceil(NUM_OF_INDICES_IN_SHARD);
for (i, chunk) in list.chunks(NUM_OF_INDICES_IN_SHARD).enumerate() {
let is_last = i == num_chunks - 1;
// Use u64::MAX for the final shard's key. This invariant allows incremental sync
// to find the last shard via seek_exact(address, storage_key, u64::MAX) for merging.
let highest = if is_last { u64::MAX } else { *chunk.last().expect("chunk is non-empty") };
let key = StorageShardedKey::new(address, storage_key, highest);
let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
if append_only {
writer.append_storage_history(key, &value)?;
} else {
writer.upsert_storage_history(key, &value)?;
}
}
list.clear();
Ok(())
}

View File

@@ -13,7 +13,7 @@ use crate::{
providers::{history_info, HistoryInfo, StaticFileProvider, StaticFileProviderRWRefMut},
StaticFileProviderFactory,
};
use alloy_primitives::{map::HashMap, Address, BlockNumber, TxHash, TxNumber};
use alloy_primitives::{map::HashMap, Address, BlockNumber, TxHash, TxNumber, B256};
use rayon::slice::ParallelSliceMut;
use reth_db::{
cursor::{DbCursorRO, DbDupCursorRW},
@@ -512,6 +512,49 @@ where
Self::RocksDB(batch) => batch.delete::<tables::StoragesHistory>(key),
}
}
/// Appends a storage history entry (for first sync - more efficient).
pub fn append_storage_history(
&mut self,
key: StorageShardedKey,
value: &BlockNumberList,
) -> ProviderResult<()> {
match self {
Self::Database(cursor) => Ok(cursor.append(key, value)?),
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
}
}
/// Upserts a storage history entry (for incremental sync).
pub fn upsert_storage_history(
&mut self,
key: StorageShardedKey,
value: &BlockNumberList,
) -> ProviderResult<()> {
match self {
Self::Database(cursor) => Ok(cursor.upsert(key, value)?),
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.put::<tables::StoragesHistory>(key, value),
}
}
/// Gets the last shard for an address and storage key (keyed with `u64::MAX`).
pub fn get_last_storage_history_shard(
&mut self,
address: Address,
storage_key: B256,
) -> ProviderResult<Option<BlockNumberList>> {
let key = StorageShardedKey::last(address, storage_key);
match self {
Self::Database(cursor) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)),
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => batch.get::<tables::StoragesHistory>(key),
}
}
}
impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>

View File

@@ -3005,25 +3005,35 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvi
.collect::<Vec<_>>();
storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
for &(address, storage_key, rem_index) in &storage_changesets {
let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
&mut cursor,
StorageShardedKey::last(address, storage_key),
rem_index,
|storage_sharded_key| {
storage_sharded_key.address == address &&
storage_sharded_key.sharded_key.key == storage_key
},
)?;
// Check the last returned partial shard.
// If it's not empty, the shard needs to be reinserted.
if !partial_shard.is_empty() {
cursor.insert(
if self.cached_storage_settings().storages_history_in_rocksdb {
#[cfg(all(unix, feature = "rocksdb"))]
{
let batch =
self.rocksdb_provider.unwind_storage_history_indices(&storage_changesets)?;
self.pending_rocksdb_batches.lock().push(batch);
}
} else {
// Unwind the storage history index in MDBX.
let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
for &(address, storage_key, rem_index) in &storage_changesets {
let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
&mut cursor,
StorageShardedKey::last(address, storage_key),
&BlockNumberList::new_pre_sorted(partial_shard),
rem_index,
|storage_sharded_key| {
storage_sharded_key.address == address &&
storage_sharded_key.sharded_key.key == storage_key
},
)?;
// Check the last returned partial shard.
// If it's not empty, the shard needs to be reinserted.
if !partial_shard.is_empty() {
cursor.insert(
StorageShardedKey::last(address, storage_key),
&BlockNumberList::new_pre_sorted(partial_shard),
)?;
}
}
}

View File

@@ -814,6 +814,52 @@ impl RocksDBProvider {
Ok(result)
}
/// Returns all storage history shards for the given `(address, storage_key)` pair.
///
/// Iterates through all shards in ascending `highest_block_number` order until
/// a different `(address, storage_key)` is encountered.
pub fn storage_history_shards(
&self,
address: Address,
storage_key: B256,
) -> ProviderResult<Vec<(StorageShardedKey, BlockNumberList)>> {
let cf = self.get_cf_handle::<tables::StoragesHistory>()?;
let start_key = StorageShardedKey::new(address, storage_key, 0u64);
let start_bytes = start_key.encode();
let iter = self
.0
.iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward));
let mut result = Vec::new();
for item in iter {
match item {
Ok((key_bytes, value_bytes)) => {
let key = StorageShardedKey::decode(&key_bytes)
.map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
if key.address != address || key.sharded_key.key != storage_key {
break;
}
let value = BlockNumberList::decompress(&value_bytes)
.map_err(|_| ProviderError::Database(DatabaseError::Decode))?;
result.push((key, value));
}
Err(e) => {
return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
message: e.to_string().into(),
code: -1,
})));
}
}
}
Ok(result)
}
/// Unwinds account history indices for the given `(address, block_number)` pairs.
///
/// Groups addresses by their minimum block number and calls the appropriate unwind
@@ -846,6 +892,37 @@ impl RocksDBProvider {
Ok(batch.into_inner())
}
/// Unwinds storage history indices for the given `(address, storage_key, block_number)` tuples.
///
/// Groups by `(address, storage_key)` and finds the minimum block number for each.
/// For each key, keeps only blocks less than the minimum block
/// (i.e., removes the minimum block and all higher blocks).
///
/// Returns a `WriteBatchWithTransaction` that can be committed later.
pub fn unwind_storage_history_indices(
&self,
storage_changesets: &[(Address, B256, BlockNumber)],
) -> ProviderResult<WriteBatchWithTransaction<true>> {
let mut key_min_block: HashMap<(Address, B256), BlockNumber> =
HashMap::with_capacity_and_hasher(storage_changesets.len(), Default::default());
for &(address, storage_key, block_number) in storage_changesets {
key_min_block
.entry((address, storage_key))
.and_modify(|min| *min = (*min).min(block_number))
.or_insert(block_number);
}
let mut batch = self.batch();
for ((address, storage_key), min_block) in key_min_block {
match min_block.checked_sub(1) {
Some(keep_to) => batch.unwind_storage_history_to(address, storage_key, keep_to)?,
None => batch.clear_storage_history(address, storage_key)?,
}
}
Ok(batch.into_inner())
}
/// Writes a batch of operations atomically.
#[instrument(level = "debug", target = "providers::rocksdb", skip_all)]
pub fn write_batch<F>(&self, f: F) -> ProviderResult<()>
@@ -1290,6 +1367,87 @@ impl<'a> RocksDBBatch<'a> {
Ok(())
}
/// Unwinds storage history to keep only blocks `<= keep_to`.
///
/// Handles multi-shard scenarios by:
/// 1. Loading all shards for the `(address, storage_key)` pair
/// 2. Finding the boundary shard containing `keep_to`
/// 3. Deleting all shards after the boundary
/// 4. Truncating the boundary shard to keep only indices `<= keep_to`
/// 5. Ensuring the last shard is keyed with `u64::MAX`
pub fn unwind_storage_history_to(
&mut self,
address: Address,
storage_key: B256,
keep_to: BlockNumber,
) -> ProviderResult<()> {
let shards = self.provider.storage_history_shards(address, storage_key)?;
if shards.is_empty() {
return Ok(());
}
// Find the first shard that might contain blocks > keep_to.
// A shard is affected if it's the sentinel (u64::MAX) or its highest_block_number > keep_to
let boundary_idx = shards.iter().position(|(key, _)| {
key.sharded_key.highest_block_number == u64::MAX ||
key.sharded_key.highest_block_number > keep_to
});
// Repair path: no shards affected means all blocks <= keep_to, just ensure sentinel exists
let Some(boundary_idx) = boundary_idx else {
let (last_key, last_value) = shards.last().expect("shards is non-empty");
if last_key.sharded_key.highest_block_number != u64::MAX {
self.delete::<tables::StoragesHistory>(last_key.clone())?;
self.put::<tables::StoragesHistory>(
StorageShardedKey::last(address, storage_key),
last_value,
)?;
}
return Ok(());
};
// Delete all shards strictly after the boundary (they are entirely > keep_to)
for (key, _) in shards.iter().skip(boundary_idx + 1) {
self.delete::<tables::StoragesHistory>(key.clone())?;
}
// Process the boundary shard: filter out blocks > keep_to
let (boundary_key, boundary_list) = &shards[boundary_idx];
// Delete the boundary shard (we'll either drop it or rewrite at u64::MAX)
self.delete::<tables::StoragesHistory>(boundary_key.clone())?;
// Build truncated list once; check emptiness directly (avoids double iteration)
let new_last =
BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to));
if new_last.is_empty() {
// Boundary shard is now empty. Previous shard becomes the last and must be keyed
// u64::MAX.
if boundary_idx == 0 {
// Nothing left for this (address, storage_key) pair
return Ok(());
}
let (prev_key, prev_value) = &shards[boundary_idx - 1];
if prev_key.sharded_key.highest_block_number != u64::MAX {
self.delete::<tables::StoragesHistory>(prev_key.clone())?;
self.put::<tables::StoragesHistory>(
StorageShardedKey::last(address, storage_key),
prev_value,
)?;
}
return Ok(());
}
self.put::<tables::StoragesHistory>(
StorageShardedKey::last(address, storage_key),
&new_last,
)?;
Ok(())
}
/// Clears all account history shards for the given address.
///
/// Used when unwinding from block 0 (i.e., removing all history).
@@ -1300,6 +1458,21 @@ impl<'a> RocksDBBatch<'a> {
}
Ok(())
}
/// Clears all storage history shards for the given `(address, storage_key)` pair.
///
/// Used when unwinding from block 0 (i.e., removing all history for this storage slot).
pub fn clear_storage_history(
&mut self,
address: Address,
storage_key: B256,
) -> ProviderResult<()> {
let shards = self.provider.storage_history_shards(address, storage_key)?;
for (key, _) in shards {
self.delete::<tables::StoragesHistory>(key)?;
}
Ok(())
}
}
/// `RocksDB` transaction wrapper providing MDBX-like semantics.