Compare commits

...

27 Commits

Author SHA1 Message Date
yongkangc
54a673abc1 merge: origin/main 2026-01-22 16:15:20 +00:00
yongkangc
7860e96420 docs: expand sharding explanation with key structure 2026-01-22 11:58:00 +00:00
yongkangc
d43ec7dc43 docs: explain what a shard is 2026-01-22 11:56:39 +00:00
yongkangc
e417df5b4f docs: add comments to flush_shards_partial 2026-01-22 11:54:45 +00:00
yongkangc
495ce0a44e refactor: add type alias for Prefix to improve readability 2026-01-22 11:53:33 +00:00
yongkangc
49b59241cd docs: add inline comments to load_sharded_history 2026-01-22 11:52:32 +00:00
yongkangc
ed36eb7197 test: add symmetric shard boundary tests for storage history
- Add insert_index_exact_shard_boundary for storage history
- Add insert_index_incremental_merge_overflow for storage history
- Fix clippy warnings in account history tests (backticks, redundant clone)
2026-01-22 11:46:01 +00:00
yongkangc
465614af5c test: add shard boundary and incremental merge overflow tests
- Add debug_assert in flush_shards_partial to verify flush_len <= list.len()
- Add insert_index_exact_shard_boundary test for exact k * NUM_OF_INDICES_IN_SHARD
- Add insert_index_incremental_merge_overflow test for u64::MAX sentinel rewrite
2026-01-22 11:34:28 +00:00
yongkangc
9bb1183201 chore: remove accidental binary 2026-01-22 11:24:23 +00:00
yongkangc
fa3355cc8f refactor(stages): add HistoryShardWriter trait for history shard deduplication
Replace closure-based load_sharded_history with trait-based approach:
- Add HistoryShardWriter trait with PrefixKey/TableKey associated types
- Add AccountHistoryShardWriter adapter wrapping EitherWriter
- Add StorageHistoryShardWriter adapter wrapping EitherWriter
- Pre-allocate current_list to avoid growth allocations in hot path

Preserves all invariants:
- u64::MAX sentinel for final shard key
- append vs upsert semantics for MDBX/RocksDB backends
- Sorted key requirement for append_only mode
- Incremental sync merging via get_last_shard

Amp-Thread-ID: https://ampcode.com/threads/T-019be4f6-c681-73ea-ab9b-89403daec00a
2026-01-22 09:21:02 +00:00
yongkangc
2f0e600a0a merge: resolve conflicts with origin/main 2026-01-22 09:11:07 +00:00
Georgios Konstantopoulos
e7258b9ffe chore: trim verbose comments and fix split_off allocation
- Remove redundant inline comments that restate the code
- Simplify doc comments on wrapper functions
- Replace split_off + reassign with copy_within + truncate (avoids allocation)
2026-01-21 14:03:12 +00:00
Georgios Konstantopoulos
74e5b13fe8 fix: address clippy warnings in utils.rs 2026-01-21 09:08:52 +00:00
Georgios Konstantopoulos
4ce5969f91 style: fix formatting in utils.rs 2026-01-21 08:59:06 +00:00
Georgios Konstantopoulos
c328a2ef46 refactor(stages): deduplicate account/storage history shard loading
Introduces a generic `load_sharded_history` function that consolidates the
duplicated shard-loading logic between account history and storage history
stages. This reduces ~150 lines of duplicated code.

The refactor:
- Creates a generic function with closure parameters for table-specific ops
- `load_account_history` and `load_storage_history` are now thin wrappers
- Preserves all invariants: u64::MAX sentinel, append vs upsert, partial buffering
- Both MDBX and RocksDB backends continue to work via EitherWriter

No behavioral changes - this is a pure refactor for maintainability.
2026-01-21 08:53:00 +00:00
yongkangc
05187c3a74 fix: remove unresolved doc links to deleted load_indices/LoadMode 2026-01-20 18:52:37 +00:00
yongkangc
1f3a4cadea refactor: extract unwind_storage_history_indices to RocksDBProvider
Match the same pattern as unwind_account_history_indices - move the
HashMap grouping and batch logic from DatabaseProvider into
RocksDBProvider for consistency.
2026-01-20 18:47:39 +00:00
yongkangc
17f7cbfde0 refactor: remove dead code and fix rocksdb iterator path
- Remove unused load_history_indices, load_indices, and LoadMode
  (replaced by EitherWriter-based load_account_history and load_storage_history)
- Fix storage_history_shards iterator to use self.0 instead of self.0.db
- Update doc comments to remove references to deleted functions
2026-01-20 18:36:45 +00:00
yongkangc
f68db568ff chore: align with account history PR format
- Add RocksDB clear() safety comment in storage history stage
- Remove blank line between first_sync and use_rocksdb declarations
- Add append_account_history, upsert_account_history, get_last_account_history_shard
  to EitherWriter for parity with account history PR
- Update test to use upsert_account_history instead of put_account_history
2026-01-20 18:32:12 +00:00
yongkangc
f4820c5d8e refactor: use with_rocksdb_batch pattern for storage history
- Add with_rocksdb_batch method to RocksDBProviderFactory trait
- Refactor storage history stage to use with_rocksdb_batch like account history
- This removes explicit #[cfg] blocks in the stage for cleaner code
2026-01-20 18:32:12 +00:00
yongkangc
a3371eaf9d chore: align storage history with account history format
- Rename load_storage_history_via_writer to load_storage_history (matching account history pattern)
- Fix doc comment: 'account changesets' -> 'storage changesets'
- Add inline comments throughout load_storage_history matching the account history style
- Add doc comments for flush helpers referencing LoadMode equivalents
- Add inline comments explaining shard flushing logic and u64::MAX invariant
2026-01-20 18:32:12 +00:00
yongkangc
b29684ff9d chore: remove redundant comment 2026-01-20 18:32:12 +00:00
yongkangc
e8229f89eb chore: add comments to unwind_storage_history_to for parity with account history 2026-01-20 18:32:12 +00:00
yongkangc
2d05085c43 refactor: align unwind_storage_history_to with account history pattern
Use is_empty() check instead of separate count() to avoid double iteration,
matching the unwind_account_history_to implementation.
2026-01-20 18:32:12 +00:00
yongkangc
b74f9326f3 fix: backtick RocksDB in doc comment 2026-01-20 18:32:12 +00:00
yongkangc
f01bb9bacc fix: move RocksDB storage history unwind to provider layer
- Add storage_history_shards() to RocksDBProvider for multi-shard enumeration
- Add unwind_storage_history_to() and clear_storage_history() to RocksDBBatch
- Update DatabaseProvider::unwind_storage_history_indices to route to RocksDB
- Simplify stage unwind to delegate to provider (matching account history pattern)
- Add multi-shard unwind test
- Use checked_sub(1) for block 0 edge case
- Remove now-unused unwind_storage_history_shard from EitherWriter
2026-01-20 18:32:12 +00:00
yongkangc
396f33372f feat(stages): add RocksDB support for IndexStorageHistoryStage
This implements RocksDB support for the IndexStorageHistoryStage following
the pattern established in #21165 for IndexAccountHistoryStage:

- Add RocksDBProviderFactory and StorageSettingsCache trait bounds to Stage impl
- Use EitherWriter::new_storages_history with explicit #[cfg] blocks for batch creation
- Add helper functions for loading/flushing storage history shards:
  - load_storage_history_via_writer - iterate collector, merge with existing shards
  - flush_storage_history_shards_partial - write full shards, keep partial in memory
  - flush_storage_history_shards - write all remaining shards with u64::MAX for last
- Add EitherWriter methods:
  - append_storage_history, upsert_storage_history, get_last_storage_history_shard
  - unwind_storage_history_shard - handles RocksDB shard unwinding
- Add RocksDBProvider::clear() and RocksDBBatch::get() methods
- Route unwind to RocksDB when storages_history_in_rocksdb is enabled
- Add rocksdb_tests module with tests for:
  - execute_writes_to_rocksdb_when_enabled
  - unwind_deletes_from_rocksdb_when_enabled
  - unwind_to_zero_keeps_block_zero
  - execute_incremental_sync

Part of #20593 - Move secondary indices to RocksDB
2026-01-20 18:32:11 +00:00
6 changed files with 489 additions and 342 deletions

View File

@@ -517,6 +517,95 @@ mod tests {
assert!(table.is_empty());
}
/// Tests exact shard boundary: exactly k * `NUM_OF_INDICES_IN_SHARD` entries.
/// Verifies the final shard correctly uses `u64::MAX` as sentinel key when
/// the entry count is an exact multiple of shard size.
#[tokio::test]
async fn insert_index_exact_shard_boundary() {
let db = TestStageDB::default();
db.commit(|tx| {
for block in 0..NUM_OF_INDICES_IN_SHARD as u64 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 1, ..Default::default() },
)?;
tx.put::<tables::AccountChangeSets>(block, acc())?;
}
Ok(())
})
.unwrap();
run(&db, (NUM_OF_INDICES_IN_SHARD - 1) as u64, None);
let table = cast(db.table::<tables::AccountsHistory>().unwrap());
let expected_blocks: Vec<u64> = (0..NUM_OF_INDICES_IN_SHARD as u64).collect();
assert_eq!(table.len(), 1, "Should have exactly one shard");
assert_eq!(
table,
BTreeMap::from([(shard(u64::MAX), expected_blocks)]),
"Final shard key should be u64::MAX"
);
unwind(&db, (NUM_OF_INDICES_IN_SHARD - 1) as u64, 0);
let table = cast(db.table::<tables::AccountsHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![0])]));
}
/// Tests incremental merge overflow: existing full shard gets converted
/// from `u64::MAX` sentinel to actual highest block, and new entries
/// create a new final shard with `u64::MAX`.
#[tokio::test]
async fn insert_index_incremental_merge_overflow() {
let db = TestStageDB::default();
let first_shard_blocks: Vec<u64> = (0..NUM_OF_INDICES_IN_SHARD as u64).collect();
db.commit(|tx| {
for block in 0..(NUM_OF_INDICES_IN_SHARD + 5) as u64 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 1, ..Default::default() },
)?;
tx.put::<tables::AccountChangeSets>(block, acc())?;
}
tx.put::<tables::AccountsHistory>(shard(u64::MAX), list(&first_shard_blocks))?;
Ok(())
})
.unwrap();
let last_block = (NUM_OF_INDICES_IN_SHARD + 4) as u64;
run(&db, last_block, Some((NUM_OF_INDICES_IN_SHARD - 1) as u64));
let table = cast(db.table::<tables::AccountsHistory>().unwrap());
assert_eq!(table.len(), 2, "Should have two shards after overflow");
let new_shard_blocks: Vec<u64> =
(NUM_OF_INDICES_IN_SHARD as u64..(NUM_OF_INDICES_IN_SHARD + 5) as u64).collect();
assert_eq!(
table.get(&shard((NUM_OF_INDICES_IN_SHARD - 1) as u64)),
Some(&first_shard_blocks),
"First shard should have highest_block = last entry"
);
assert_eq!(
table.get(&shard(u64::MAX)),
Some(&new_shard_blocks),
"New final shard should have u64::MAX key"
);
unwind(&db, last_block, (NUM_OF_INDICES_IN_SHARD - 1) as u64);
let table = cast(db.table::<tables::AccountsHistory>().unwrap());
assert_eq!(
table,
BTreeMap::from([(shard(u64::MAX), first_shard_blocks)]),
"After unwind, should revert to single shard with u64::MAX"
);
}
stage_test_suite_ext!(IndexAccountHistoryTestRunner, index_account_history);
struct IndexAccountHistoryTestRunner {

View File

@@ -537,6 +537,101 @@ mod tests {
assert!(table.is_empty());
}
/// Tests exact shard boundary: exactly k * `NUM_OF_INDICES_IN_SHARD` entries.
/// Verifies the final shard correctly uses `u64::MAX` as sentinel key when
/// the entry count is an exact multiple of shard size.
#[tokio::test]
async fn insert_index_exact_shard_boundary() {
let db = TestStageDB::default();
db.commit(|tx| {
for block in 0..NUM_OF_INDICES_IN_SHARD as u64 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 1, ..Default::default() },
)?;
tx.put::<tables::StorageChangeSets>(
block_number_address(block),
storage(STORAGE_KEY),
)?;
}
Ok(())
})
.unwrap();
run(&db, (NUM_OF_INDICES_IN_SHARD - 1) as u64, None);
let table = cast(db.table::<tables::StoragesHistory>().unwrap());
let expected_blocks: Vec<u64> = (0..NUM_OF_INDICES_IN_SHARD as u64).collect();
assert_eq!(table.len(), 1, "Should have exactly one shard");
assert_eq!(
table,
BTreeMap::from([(shard(u64::MAX), expected_blocks)]),
"Final shard key should be u64::MAX"
);
unwind(&db, (NUM_OF_INDICES_IN_SHARD - 1) as u64, 0);
let table = cast(db.table::<tables::StoragesHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![0])]));
}
/// Tests incremental merge overflow: existing full shard gets converted
/// from `u64::MAX` sentinel to actual highest block, and new entries
/// create a new final shard with `u64::MAX`.
#[tokio::test]
async fn insert_index_incremental_merge_overflow() {
let db = TestStageDB::default();
let first_shard_blocks: Vec<u64> = (0..NUM_OF_INDICES_IN_SHARD as u64).collect();
db.commit(|tx| {
for block in 0..(NUM_OF_INDICES_IN_SHARD + 5) as u64 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 1, ..Default::default() },
)?;
tx.put::<tables::StorageChangeSets>(
block_number_address(block),
storage(STORAGE_KEY),
)?;
}
tx.put::<tables::StoragesHistory>(shard(u64::MAX), list(&first_shard_blocks))?;
Ok(())
})
.unwrap();
let last_block = (NUM_OF_INDICES_IN_SHARD + 4) as u64;
run(&db, last_block, Some((NUM_OF_INDICES_IN_SHARD - 1) as u64));
let table = cast(db.table::<tables::StoragesHistory>().unwrap());
assert_eq!(table.len(), 2, "Should have two shards after overflow");
let new_shard_blocks: Vec<u64> =
(NUM_OF_INDICES_IN_SHARD as u64..(NUM_OF_INDICES_IN_SHARD + 5) as u64).collect();
assert_eq!(
table.get(&shard((NUM_OF_INDICES_IN_SHARD - 1) as u64)),
Some(&first_shard_blocks),
"First shard should have highest_block = last entry"
);
assert_eq!(
table.get(&shard(u64::MAX)),
Some(&new_shard_blocks),
"New final shard should have u64::MAX key"
);
unwind(&db, last_block, (NUM_OF_INDICES_IN_SHARD - 1) as u64);
let table = cast(db.table::<tables::StoragesHistory>().unwrap());
assert_eq!(
table,
BTreeMap::from([(shard(u64::MAX), first_shard_blocks)]),
"After unwind, should revert to single shard with u64::MAX"
);
}
stage_test_suite_ext!(IndexStorageHistoryTestRunner, index_storage_history);
struct IndexStorageHistoryTestRunner {

View File

@@ -1,13 +1,13 @@
//! Utils for `stages`.
use alloy_primitives::{Address, BlockNumber, TxNumber, B256};
use alloy_primitives::{Address, BlockNumber, TxNumber};
use reth_config::config::EtlConfig;
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
models::{
sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey,
AccountBeforeTx, AddressStorageKey, BlockNumberAddress, ShardedKey,
AccountBeforeTx, AddressStorageKey, BlockNumberAddress, ShardedHistoryKey, ShardedKey,
},
table::{Decode, Decompress, Table},
table::{Decode, Decompress, Key, Table},
transaction::DbTx,
BlockNumberList,
};
@@ -15,7 +15,7 @@ use reth_etl::Collector;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::StaticFileProvider, to_range, BlockReader, DBProvider, EitherWriter, ProviderError,
StaticFileProviderFactory,
ProviderResult, StaticFileProviderFactory,
};
use reth_stages_api::StageError;
use reth_static_file_types::StaticFileSegment;
@@ -23,6 +23,169 @@ use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
use std::{collections::HashMap, hash::Hash, ops::RangeBounds};
use tracing::info;
/// Trait for writing sharded history indices to the database.
pub(crate) trait HistoryShardWriter {
/// The full sharded key type for the table.
type TableKey: Key + ShardedHistoryKey;
/// Gets the last shard for a prefix (for incremental sync merging).
fn get_last_shard(
&mut self,
prefix: <Self::TableKey as ShardedHistoryKey>::Prefix,
) -> ProviderResult<Option<BlockNumberList>>;
/// Writes a shard to the database (append or upsert based on flag).
fn write_shard(
&mut self,
key: Self::TableKey,
value: &BlockNumberList,
append: bool,
) -> ProviderResult<()>;
}
/// Loads sharded history indices from a collector into the database.
///
/// ## Why sharding?
/// History indices track "which blocks modified this address/storage slot". A popular contract
/// may have millions of changes, too large for a single DB value. We split into shards of
/// `NUM_OF_INDICES_IN_SHARD` (2000) block numbers each.
///
/// ## Key structure
/// Each shard is keyed by `(prefix, highest_block_in_shard)`. Example for an address:
/// - `(0xABC..., 5000)` → blocks 3001-5000
/// - `(0xABC..., u64::MAX)` → blocks 5001-6234 (final shard)
///
/// The `u64::MAX` sentinel on the last shard enables `seek_exact(prefix, u64::MAX)` to find
/// it for incremental sync merging.
///
/// When `append_only=true`, collector must yield keys in ascending order (MDBX requirement).
fn load_sharded_history<H: HistoryShardWriter>(
collector: &mut Collector<H::TableKey, BlockNumberList>,
append_only: bool,
writer: &mut H,
) -> Result<(), StageError> {
type Prefix<H> = <<H as HistoryShardWriter>::TableKey as ShardedHistoryKey>::Prefix;
// Option needed to distinguish "no prefix yet" from "processing Address::ZERO"
let mut current_prefix: Option<Prefix<H>> = None;
// Buffer for block numbers; sized for ~2 shards to minimize reallocations
let mut current_list = Vec::<u64>::with_capacity(NUM_OF_INDICES_IN_SHARD * 2);
// Progress reporting setup
let total_entries = collector.len();
let interval = (total_entries / 10).max(1);
for (index, element) in collector.iter()?.enumerate() {
let (k, v) = element?;
let sharded_key = H::TableKey::decode_owned(k)?;
let new_list = BlockNumberList::decompress_owned(v)?;
if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
}
let prefix = sharded_key.prefix();
// When prefix changes, flush previous prefix's shards and start fresh
if current_prefix != Some(prefix) {
// Flush remaining shards for the previous prefix (uses u64::MAX for final shard)
if let Some(prev_prefix) = current_prefix {
flush_shards::<H>(prev_prefix, &mut current_list, append_only, writer)?;
}
current_prefix = Some(prefix);
current_list.clear();
// On incremental sync, merge with existing last shard (stored with u64::MAX key)
if !append_only && let Some(last_shard) = writer.get_last_shard(prefix)? {
current_list.extend(last_shard.iter());
}
}
// Accumulate new block numbers
current_list.extend(new_list.iter());
// Flush complete shards while keeping one buffered for continued accumulation
flush_shards_partial::<H>(prefix, &mut current_list, append_only, writer)?;
}
// Flush final prefix's remaining shard
if let Some(prefix) = current_prefix {
flush_shards::<H>(prefix, &mut current_list, append_only, writer)?;
}
Ok(())
}
/// Flushes complete shards, keeping at least one shard buffered for continued accumulation.
///
/// We buffer one shard because `flush_shards` uses `u64::MAX` as the final shard's key.
/// If we flushed everything here, we'd write `u64::MAX` keys that get overwritten later.
fn flush_shards_partial<H: HistoryShardWriter>(
prefix: <H::TableKey as ShardedHistoryKey>::Prefix,
list: &mut Vec<u64>,
append_only: bool,
writer: &mut H,
) -> Result<(), StageError> {
// Not enough to fill a shard yet
if list.len() <= NUM_OF_INDICES_IN_SHARD {
return Ok(());
}
let num_full_shards = list.len() / NUM_OF_INDICES_IN_SHARD;
// Keep one shard buffered: if exact multiple, keep last full shard for u64::MAX key later
let shards_to_flush = if list.len().is_multiple_of(NUM_OF_INDICES_IN_SHARD) {
num_full_shards - 1
} else {
num_full_shards
};
if shards_to_flush == 0 {
return Ok(());
}
let flush_len = shards_to_flush * NUM_OF_INDICES_IN_SHARD;
debug_assert!(flush_len <= list.len(), "flush_len exceeds list length");
// Write complete shards with their actual highest block number as key
for chunk in list[..flush_len].chunks(NUM_OF_INDICES_IN_SHARD) {
let highest = *chunk.last().expect("chunk is non-empty");
let key = H::TableKey::new_sharded(prefix, highest);
let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
writer.write_shard(key, &value, append_only)?;
}
// Shift remaining elements to front (avoids allocation vs split_off)
list.copy_within(flush_len.., 0);
list.truncate(list.len() - flush_len);
Ok(())
}
/// Flushes all remaining shards. Uses `u64::MAX` for the final shard's key to enable
/// incremental sync lookups via `seek_exact(prefix, u64::MAX)`.
fn flush_shards<H: HistoryShardWriter>(
prefix: <H::TableKey as ShardedHistoryKey>::Prefix,
list: &mut Vec<u64>,
append_only: bool,
writer: &mut H,
) -> Result<(), StageError> {
if list.is_empty() {
return Ok(());
}
let num_chunks = list.len().div_ceil(NUM_OF_INDICES_IN_SHARD);
for (i, chunk) in list.chunks(NUM_OF_INDICES_IN_SHARD).enumerate() {
let is_last = i == num_chunks - 1;
let highest = if is_last { u64::MAX } else { *chunk.last().expect("chunk is non-empty") };
let key = H::TableKey::new_sharded(prefix, highest);
let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
writer.write_shard(key, &value, append_only)?;
}
list.clear();
Ok(())
}
/// Number of blocks before pushing indices from cache to [`Collector`]
const DEFAULT_CACHE_THRESHOLD: u64 = 100_000;
@@ -230,17 +393,40 @@ where
Ok(collector)
}
/// Loads account history indices into the database via `EitherWriter`.
///
/// Works with [`EitherWriter`] to support both MDBX and `RocksDB` backends.
///
/// ## Process
/// Iterates over elements, grouping indices by their address. It flushes indices to disk
/// when reaching a shard's max length (`NUM_OF_INDICES_IN_SHARD`) or when the address changes,
/// ensuring the last previous address shard is stored.
///
/// Uses `Option<Address>` instead of `Address::default()` as the sentinel to avoid
/// incorrectly treating `Address::ZERO` as "no previous address".
/// Adapter for writing account history shards via `EitherWriter`.
struct AccountHistoryShardWriter<'a, 'tx, CURSOR, N> {
writer: &'a mut EitherWriter<'tx, CURSOR, N>,
}
impl<CURSOR, N: NodePrimitives> HistoryShardWriter for AccountHistoryShardWriter<'_, '_, CURSOR, N>
where
CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
+ DbCursorRO<reth_db_api::tables::AccountsHistory>,
{
type TableKey = ShardedKey<Address>;
fn get_last_shard(
&mut self,
prefix: <Self::TableKey as ShardedHistoryKey>::Prefix,
) -> ProviderResult<Option<BlockNumberList>> {
self.writer.get_last_account_history_shard(prefix)
}
fn write_shard(
&mut self,
key: Self::TableKey,
value: &BlockNumberList,
append: bool,
) -> ProviderResult<()> {
if append {
self.writer.append_account_history(key, value)
} else {
self.writer.upsert_account_history(key, value)
}
}
}
/// Loads account history indices from the collector into the database.
pub(crate) fn load_account_history<N, CURSOR>(
mut collector: Collector<ShardedKey<Address>, BlockNumberList>,
append_only: bool,
@@ -251,155 +437,8 @@ where
CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
+ DbCursorRO<reth_db_api::tables::AccountsHistory>,
{
let mut current_address: Option<Address> = None;
// Accumulator for block numbers where the current address changed.
let mut current_list = Vec::<u64>::new();
let total_entries = collector.len();
let interval = (total_entries / 10).max(1);
for (index, element) in collector.iter()?.enumerate() {
let (k, v) = element?;
let sharded_key = ShardedKey::<Address>::decode_owned(k)?;
let new_list = BlockNumberList::decompress_owned(v)?;
if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
}
let address = sharded_key.key;
// When address changes, flush the previous address's shards and start fresh.
if current_address != Some(address) {
// Flush all remaining shards for the previous address (uses u64::MAX for last shard).
if let Some(prev_addr) = current_address {
flush_account_history_shards(prev_addr, &mut current_list, append_only, writer)?;
}
current_address = Some(address);
current_list.clear();
// On incremental sync, merge with the existing last shard from the database.
// The last shard is stored with key (address, u64::MAX) so we can find it.
if !append_only &&
let Some(last_shard) = writer.get_last_account_history_shard(address)?
{
current_list.extend(last_shard.iter());
}
}
// Append new block numbers to the accumulator.
current_list.extend(new_list.iter());
// Flush complete shards, keeping the last (partial) shard buffered.
flush_account_history_shards_partial(address, &mut current_list, append_only, writer)?;
}
// Flush the final address's remaining shard.
if let Some(addr) = current_address {
flush_account_history_shards(addr, &mut current_list, append_only, writer)?;
}
Ok(())
}
/// Flushes complete shards for account history, keeping the trailing partial shard buffered.
///
/// Only flushes when we have more than one shard's worth of data, keeping the last
/// (possibly partial) shard for continued accumulation. This avoids writing a shard
/// that may need to be updated when more indices arrive.
fn flush_account_history_shards_partial<N, CURSOR>(
address: Address,
list: &mut Vec<u64>,
append_only: bool,
writer: &mut EitherWriter<'_, CURSOR, N>,
) -> Result<(), StageError>
where
N: NodePrimitives,
CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
+ DbCursorRO<reth_db_api::tables::AccountsHistory>,
{
// Nothing to flush if we haven't filled a complete shard yet.
if list.len() <= NUM_OF_INDICES_IN_SHARD {
return Ok(());
}
let num_full_shards = list.len() / NUM_OF_INDICES_IN_SHARD;
// Always keep at least one shard buffered for continued accumulation.
// If len is exact multiple of shard size, keep the last full shard.
let shards_to_flush = if list.len().is_multiple_of(NUM_OF_INDICES_IN_SHARD) {
num_full_shards - 1
} else {
num_full_shards
};
if shards_to_flush == 0 {
return Ok(());
}
// Split: flush the first N shards, keep the remainder buffered.
let flush_len = shards_to_flush * NUM_OF_INDICES_IN_SHARD;
let remainder = list.split_off(flush_len);
// Write each complete shard with its highest block number as the key.
for chunk in list.chunks(NUM_OF_INDICES_IN_SHARD) {
let highest = *chunk.last().expect("chunk is non-empty");
let key = ShardedKey::new(address, highest);
let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
if append_only {
writer.append_account_history(key, &value)?;
} else {
writer.upsert_account_history(key, &value)?;
}
}
// Keep the remaining indices for the next iteration.
*list = remainder;
Ok(())
}
/// Flushes all remaining shards for account history, using `u64::MAX` for the last shard.
///
/// The `u64::MAX` key for the final shard is an invariant that allows `seek_exact(address,
/// u64::MAX)` to find the last shard during incremental sync for merging with new indices.
fn flush_account_history_shards<N, CURSOR>(
address: Address,
list: &mut Vec<u64>,
append_only: bool,
writer: &mut EitherWriter<'_, CURSOR, N>,
) -> Result<(), StageError>
where
N: NodePrimitives,
CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
+ DbCursorRO<reth_db_api::tables::AccountsHistory>,
{
if list.is_empty() {
return Ok(());
}
let num_chunks = list.len().div_ceil(NUM_OF_INDICES_IN_SHARD);
for (i, chunk) in list.chunks(NUM_OF_INDICES_IN_SHARD).enumerate() {
let is_last = i == num_chunks - 1;
// Use u64::MAX for the final shard's key. This invariant allows incremental sync
// to find the last shard via seek_exact(address, u64::MAX) for merging.
let highest = if is_last { u64::MAX } else { *chunk.last().expect("chunk is non-empty") };
let key = ShardedKey::new(address, highest);
let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
if append_only {
writer.append_account_history(key, &value)?;
} else {
writer.upsert_account_history(key, &value)?;
}
}
list.clear();
Ok(())
let mut adapter = AccountHistoryShardWriter { writer };
load_sharded_history(&mut collector, append_only, &mut adapter)
}
/// Called when database is ahead of static files. Attempts to find the first block we are missing
@@ -438,17 +477,40 @@ where
})
}
/// Loads storage history indices into the database via `EitherWriter`.
///
/// Works with [`EitherWriter`] to support both MDBX and `RocksDB` backends.
///
/// ## Process
/// Iterates over elements, grouping indices by their (address, `storage_key`) pairs. It flushes
/// indices to disk when reaching a shard's max length (`NUM_OF_INDICES_IN_SHARD`) or when the
/// (address, `storage_key`) pair changes, ensuring the last previous shard is stored.
///
/// Uses `Option<(Address, B256)>` instead of default values as the sentinel to avoid
/// incorrectly treating `(Address::ZERO, B256::ZERO)` as "no previous key".
/// Adapter for writing storage history shards via `EitherWriter`.
struct StorageHistoryShardWriter<'a, 'tx, CURSOR, N> {
writer: &'a mut EitherWriter<'tx, CURSOR, N>,
}
impl<CURSOR, N: NodePrimitives> HistoryShardWriter for StorageHistoryShardWriter<'_, '_, CURSOR, N>
where
CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
+ DbCursorRO<reth_db_api::tables::StoragesHistory>,
{
type TableKey = StorageShardedKey;
fn get_last_shard(
&mut self,
prefix: <Self::TableKey as ShardedHistoryKey>::Prefix,
) -> ProviderResult<Option<BlockNumberList>> {
self.writer.get_last_storage_history_shard(prefix.0, prefix.1)
}
fn write_shard(
&mut self,
key: Self::TableKey,
value: &BlockNumberList,
append: bool,
) -> ProviderResult<()> {
if append {
self.writer.append_storage_history(key, value)
} else {
self.writer.upsert_storage_history(key, value)
}
}
}
/// Loads storage history indices from the collector into the database.
pub(crate) fn load_storage_history<N, CURSOR>(
mut collector: Collector<StorageShardedKey, BlockNumberList>,
append_only: bool,
@@ -459,169 +521,6 @@ where
CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
+ DbCursorRO<reth_db_api::tables::StoragesHistory>,
{
let mut current_key: Option<(Address, B256)> = None;
// Accumulator for block numbers where the current (address, storage_key) changed.
let mut current_list = Vec::<u64>::new();
let total_entries = collector.len();
let interval = (total_entries / 10).max(1);
for (index, element) in collector.iter()?.enumerate() {
let (k, v) = element?;
let sharded_key = StorageShardedKey::decode_owned(k)?;
let new_list = BlockNumberList::decompress_owned(v)?;
if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
}
let partial_key = (sharded_key.address, sharded_key.sharded_key.key);
// When (address, storage_key) changes, flush the previous key's shards and start fresh.
if current_key != Some(partial_key) {
// Flush all remaining shards for the previous key (uses u64::MAX for last shard).
if let Some((prev_addr, prev_storage_key)) = current_key {
flush_storage_history_shards(
prev_addr,
prev_storage_key,
&mut current_list,
append_only,
writer,
)?;
}
current_key = Some(partial_key);
current_list.clear();
// On incremental sync, merge with the existing last shard from the database.
// The last shard is stored with key (address, storage_key, u64::MAX) so we can find it.
if !append_only &&
let Some(last_shard) =
writer.get_last_storage_history_shard(partial_key.0, partial_key.1)?
{
current_list.extend(last_shard.iter());
}
}
// Append new block numbers to the accumulator.
current_list.extend(new_list.iter());
// Flush complete shards, keeping the last (partial) shard buffered.
flush_storage_history_shards_partial(
partial_key.0,
partial_key.1,
&mut current_list,
append_only,
writer,
)?;
}
// Flush the final key's remaining shard.
if let Some((addr, storage_key)) = current_key {
flush_storage_history_shards(addr, storage_key, &mut current_list, append_only, writer)?;
}
Ok(())
}
/// Flushes complete shards for storage history, keeping the trailing partial shard buffered.
///
/// Only flushes when we have more than one shard's worth of data, keeping the last
/// (possibly partial) shard for continued accumulation. This avoids writing a shard
/// that may need to be updated when more indices arrive.
fn flush_storage_history_shards_partial<N, CURSOR>(
address: Address,
storage_key: B256,
list: &mut Vec<u64>,
append_only: bool,
writer: &mut EitherWriter<'_, CURSOR, N>,
) -> Result<(), StageError>
where
N: NodePrimitives,
CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
+ DbCursorRO<reth_db_api::tables::StoragesHistory>,
{
// Nothing to flush if we haven't filled a complete shard yet.
if list.len() <= NUM_OF_INDICES_IN_SHARD {
return Ok(());
}
let num_full_shards = list.len() / NUM_OF_INDICES_IN_SHARD;
// Always keep at least one shard buffered for continued accumulation.
// If len is exact multiple of shard size, keep the last full shard.
let shards_to_flush = if list.len().is_multiple_of(NUM_OF_INDICES_IN_SHARD) {
num_full_shards - 1
} else {
num_full_shards
};
if shards_to_flush == 0 {
return Ok(());
}
// Split: flush the first N shards, keep the remainder buffered.
let flush_len = shards_to_flush * NUM_OF_INDICES_IN_SHARD;
let remainder = list.split_off(flush_len);
// Write each complete shard with its highest block number as the key.
for chunk in list.chunks(NUM_OF_INDICES_IN_SHARD) {
let highest = *chunk.last().expect("chunk is non-empty");
let key = StorageShardedKey::new(address, storage_key, highest);
let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
if append_only {
writer.append_storage_history(key, &value)?;
} else {
writer.upsert_storage_history(key, &value)?;
}
}
// Keep the remaining indices for the next iteration.
*list = remainder;
Ok(())
}
/// Flushes all remaining shards for storage history, using `u64::MAX` for the last shard.
///
/// The `u64::MAX` key for the final shard is an invariant that allows
/// `seek_exact(address, storage_key, u64::MAX)` to find the last shard during incremental
/// sync for merging with new indices.
fn flush_storage_history_shards<N, CURSOR>(
address: Address,
storage_key: B256,
list: &mut Vec<u64>,
append_only: bool,
writer: &mut EitherWriter<'_, CURSOR, N>,
) -> Result<(), StageError>
where
N: NodePrimitives,
CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
+ DbCursorRO<reth_db_api::tables::StoragesHistory>,
{
if list.is_empty() {
return Ok(());
}
let num_chunks = list.len().div_ceil(NUM_OF_INDICES_IN_SHARD);
for (i, chunk) in list.chunks(NUM_OF_INDICES_IN_SHARD).enumerate() {
let is_last = i == num_chunks - 1;
// Use u64::MAX for the final shard's key. This invariant allows incremental sync
// to find the last shard via seek_exact(address, storage_key, u64::MAX) for merging.
let highest = if is_last { u64::MAX } else { *chunk.last().expect("chunk is non-empty") };
let key = StorageShardedKey::new(address, storage_key, highest);
let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
if append_only {
writer.append_storage_history(key, &value)?;
} else {
writer.upsert_storage_history(key, &value)?;
}
}
list.clear();
Ok(())
let mut adapter = StorageHistoryShardWriter { writer };
load_sharded_history(&mut collector, append_only, &mut adapter)
}

View File

@@ -32,7 +32,7 @@ pub use reth_db_models::{
AccountBeforeTx, ClientVersion, StaticFileBlockWithdrawals, StorageBeforeTx,
StoredBlockBodyIndices, StoredBlockWithdrawals,
};
pub use sharded_key::ShardedKey;
pub use sharded_key::{ShardedHistoryKey, ShardedKey};
/// Macro that implements [`Encode`] and [`Decode`] for uint types.
macro_rules! impl_uints {

View File

@@ -10,6 +10,21 @@ use std::hash::Hash;
/// Number of indices in one shard.
pub const NUM_OF_INDICES_IN_SHARD: usize = 2_000;
/// Trait for sharded history keys that can be constructed from a prefix + block number.
///
/// This abstracts over the key construction and prefix extraction for sharded history tables
/// like `AccountsHistory` and `StoragesHistory`.
pub trait ShardedHistoryKey: Sized {
/// The prefix type (e.g., `Address` for accounts, `(Address, B256)` for storage).
type Prefix: Copy + Eq;
/// Creates a new sharded key from prefix and highest block number.
fn new_sharded(prefix: Self::Prefix, highest_block_number: u64) -> Self;
/// Extracts the prefix from this key.
fn prefix(&self) -> Self::Prefix;
}
/// Size of `BlockNumber` in bytes (u64 = 8 bytes).
const BLOCK_NUMBER_SIZE: usize = std::mem::size_of::<BlockNumber>();
@@ -77,6 +92,20 @@ impl Decode for ShardedKey<Address> {
}
}
impl ShardedHistoryKey for ShardedKey<Address> {
type Prefix = Address;
#[inline]
fn new_sharded(prefix: Self::Prefix, highest_block_number: u64) -> Self {
Self::new(prefix, highest_block_number)
}
#[inline]
fn prefix(&self) -> Self::Prefix {
self.key
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -110,4 +139,14 @@ mod tests {
let decoded = ShardedKey::<Address>::decode(&encoded).unwrap();
assert_eq!(decoded.highest_block_number, u64::MAX);
}
#[test]
fn sharded_history_key_trait_roundtrip() {
let addr = address!("0102030405060708091011121314151617181920");
let block_num = 0x123456789ABCDEFu64;
let key = ShardedKey::<Address>::new_sharded(addr, block_num);
assert_eq!(key.prefix(), addr);
assert_eq!(key.highest_block_number, block_num);
}
}

View File

@@ -7,7 +7,7 @@ use alloy_primitives::{Address, BlockNumber, B256};
use derive_more::AsRef;
use serde::{Deserialize, Serialize};
use super::ShardedKey;
use super::{ShardedHistoryKey, ShardedKey};
/// Number of indices in one shard.
pub const NUM_OF_INDICES_IN_SHARD: usize = 2_000;
@@ -91,6 +91,20 @@ impl Decode for StorageShardedKey {
}
}
impl ShardedHistoryKey for StorageShardedKey {
type Prefix = (Address, B256);
#[inline]
fn new_sharded(prefix: Self::Prefix, highest_block_number: u64) -> Self {
Self::new(prefix.0, prefix.1, highest_block_number)
}
#[inline]
fn prefix(&self) -> Self::Prefix {
(self.address, self.sharded_key.key)
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -130,4 +144,15 @@ mod tests {
let decoded = StorageShardedKey::decode(&encoded).unwrap();
assert_eq!(decoded.sharded_key.highest_block_number, u64::MAX);
}
#[test]
fn sharded_history_key_trait_roundtrip() {
let addr = address!("0102030405060708091011121314151617181920");
let storage_key = b256!("0001020304050607080910111213141516171819202122232425262728293031");
let block_num = 0x123456789ABCDEFu64;
let key = StorageShardedKey::new_sharded((addr, storage_key), block_num);
assert_eq!(key.prefix(), (addr, storage_key));
assert_eq!(key.sharded_key.highest_block_number, block_num);
}
}