mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
27 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
54a673abc1 | ||
|
|
7860e96420 | ||
|
|
d43ec7dc43 | ||
|
|
e417df5b4f | ||
|
|
495ce0a44e | ||
|
|
49b59241cd | ||
|
|
ed36eb7197 | ||
|
|
465614af5c | ||
|
|
9bb1183201 | ||
|
|
fa3355cc8f | ||
|
|
2f0e600a0a | ||
|
|
e7258b9ffe | ||
|
|
74e5b13fe8 | ||
|
|
4ce5969f91 | ||
|
|
c328a2ef46 | ||
|
|
05187c3a74 | ||
|
|
1f3a4cadea | ||
|
|
17f7cbfde0 | ||
|
|
f68db568ff | ||
|
|
f4820c5d8e | ||
|
|
a3371eaf9d | ||
|
|
b29684ff9d | ||
|
|
e8229f89eb | ||
|
|
2d05085c43 | ||
|
|
b74f9326f3 | ||
|
|
f01bb9bacc | ||
|
|
396f33372f |
@@ -517,6 +517,95 @@ mod tests {
|
||||
assert!(table.is_empty());
|
||||
}
|
||||
|
||||
/// Tests exact shard boundary: exactly k * `NUM_OF_INDICES_IN_SHARD` entries.
|
||||
/// Verifies the final shard correctly uses `u64::MAX` as sentinel key when
|
||||
/// the entry count is an exact multiple of shard size.
|
||||
#[tokio::test]
|
||||
async fn insert_index_exact_shard_boundary() {
|
||||
let db = TestStageDB::default();
|
||||
|
||||
db.commit(|tx| {
|
||||
for block in 0..NUM_OF_INDICES_IN_SHARD as u64 {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 1, ..Default::default() },
|
||||
)?;
|
||||
tx.put::<tables::AccountChangeSets>(block, acc())?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
run(&db, (NUM_OF_INDICES_IN_SHARD - 1) as u64, None);
|
||||
|
||||
let table = cast(db.table::<tables::AccountsHistory>().unwrap());
|
||||
let expected_blocks: Vec<u64> = (0..NUM_OF_INDICES_IN_SHARD as u64).collect();
|
||||
assert_eq!(table.len(), 1, "Should have exactly one shard");
|
||||
assert_eq!(
|
||||
table,
|
||||
BTreeMap::from([(shard(u64::MAX), expected_blocks)]),
|
||||
"Final shard key should be u64::MAX"
|
||||
);
|
||||
|
||||
unwind(&db, (NUM_OF_INDICES_IN_SHARD - 1) as u64, 0);
|
||||
|
||||
let table = cast(db.table::<tables::AccountsHistory>().unwrap());
|
||||
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![0])]));
|
||||
}
|
||||
|
||||
/// Tests incremental merge overflow: existing full shard gets converted
|
||||
/// from `u64::MAX` sentinel to actual highest block, and new entries
|
||||
/// create a new final shard with `u64::MAX`.
|
||||
#[tokio::test]
|
||||
async fn insert_index_incremental_merge_overflow() {
|
||||
let db = TestStageDB::default();
|
||||
|
||||
let first_shard_blocks: Vec<u64> = (0..NUM_OF_INDICES_IN_SHARD as u64).collect();
|
||||
|
||||
db.commit(|tx| {
|
||||
for block in 0..(NUM_OF_INDICES_IN_SHARD + 5) as u64 {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 1, ..Default::default() },
|
||||
)?;
|
||||
tx.put::<tables::AccountChangeSets>(block, acc())?;
|
||||
}
|
||||
|
||||
tx.put::<tables::AccountsHistory>(shard(u64::MAX), list(&first_shard_blocks))?;
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let last_block = (NUM_OF_INDICES_IN_SHARD + 4) as u64;
|
||||
run(&db, last_block, Some((NUM_OF_INDICES_IN_SHARD - 1) as u64));
|
||||
|
||||
let table = cast(db.table::<tables::AccountsHistory>().unwrap());
|
||||
assert_eq!(table.len(), 2, "Should have two shards after overflow");
|
||||
|
||||
let new_shard_blocks: Vec<u64> =
|
||||
(NUM_OF_INDICES_IN_SHARD as u64..(NUM_OF_INDICES_IN_SHARD + 5) as u64).collect();
|
||||
|
||||
assert_eq!(
|
||||
table.get(&shard((NUM_OF_INDICES_IN_SHARD - 1) as u64)),
|
||||
Some(&first_shard_blocks),
|
||||
"First shard should have highest_block = last entry"
|
||||
);
|
||||
assert_eq!(
|
||||
table.get(&shard(u64::MAX)),
|
||||
Some(&new_shard_blocks),
|
||||
"New final shard should have u64::MAX key"
|
||||
);
|
||||
|
||||
unwind(&db, last_block, (NUM_OF_INDICES_IN_SHARD - 1) as u64);
|
||||
|
||||
let table = cast(db.table::<tables::AccountsHistory>().unwrap());
|
||||
assert_eq!(
|
||||
table,
|
||||
BTreeMap::from([(shard(u64::MAX), first_shard_blocks)]),
|
||||
"After unwind, should revert to single shard with u64::MAX"
|
||||
);
|
||||
}
|
||||
|
||||
stage_test_suite_ext!(IndexAccountHistoryTestRunner, index_account_history);
|
||||
|
||||
struct IndexAccountHistoryTestRunner {
|
||||
|
||||
@@ -537,6 +537,101 @@ mod tests {
|
||||
assert!(table.is_empty());
|
||||
}
|
||||
|
||||
/// Tests exact shard boundary: exactly k * `NUM_OF_INDICES_IN_SHARD` entries.
|
||||
/// Verifies the final shard correctly uses `u64::MAX` as sentinel key when
|
||||
/// the entry count is an exact multiple of shard size.
|
||||
#[tokio::test]
|
||||
async fn insert_index_exact_shard_boundary() {
|
||||
let db = TestStageDB::default();
|
||||
|
||||
db.commit(|tx| {
|
||||
for block in 0..NUM_OF_INDICES_IN_SHARD as u64 {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 1, ..Default::default() },
|
||||
)?;
|
||||
tx.put::<tables::StorageChangeSets>(
|
||||
block_number_address(block),
|
||||
storage(STORAGE_KEY),
|
||||
)?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
run(&db, (NUM_OF_INDICES_IN_SHARD - 1) as u64, None);
|
||||
|
||||
let table = cast(db.table::<tables::StoragesHistory>().unwrap());
|
||||
let expected_blocks: Vec<u64> = (0..NUM_OF_INDICES_IN_SHARD as u64).collect();
|
||||
assert_eq!(table.len(), 1, "Should have exactly one shard");
|
||||
assert_eq!(
|
||||
table,
|
||||
BTreeMap::from([(shard(u64::MAX), expected_blocks)]),
|
||||
"Final shard key should be u64::MAX"
|
||||
);
|
||||
|
||||
unwind(&db, (NUM_OF_INDICES_IN_SHARD - 1) as u64, 0);
|
||||
|
||||
let table = cast(db.table::<tables::StoragesHistory>().unwrap());
|
||||
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![0])]));
|
||||
}
|
||||
|
||||
/// Tests incremental merge overflow: existing full shard gets converted
|
||||
/// from `u64::MAX` sentinel to actual highest block, and new entries
|
||||
/// create a new final shard with `u64::MAX`.
|
||||
#[tokio::test]
|
||||
async fn insert_index_incremental_merge_overflow() {
|
||||
let db = TestStageDB::default();
|
||||
|
||||
let first_shard_blocks: Vec<u64> = (0..NUM_OF_INDICES_IN_SHARD as u64).collect();
|
||||
|
||||
db.commit(|tx| {
|
||||
for block in 0..(NUM_OF_INDICES_IN_SHARD + 5) as u64 {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 1, ..Default::default() },
|
||||
)?;
|
||||
tx.put::<tables::StorageChangeSets>(
|
||||
block_number_address(block),
|
||||
storage(STORAGE_KEY),
|
||||
)?;
|
||||
}
|
||||
|
||||
tx.put::<tables::StoragesHistory>(shard(u64::MAX), list(&first_shard_blocks))?;
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let last_block = (NUM_OF_INDICES_IN_SHARD + 4) as u64;
|
||||
run(&db, last_block, Some((NUM_OF_INDICES_IN_SHARD - 1) as u64));
|
||||
|
||||
let table = cast(db.table::<tables::StoragesHistory>().unwrap());
|
||||
assert_eq!(table.len(), 2, "Should have two shards after overflow");
|
||||
|
||||
let new_shard_blocks: Vec<u64> =
|
||||
(NUM_OF_INDICES_IN_SHARD as u64..(NUM_OF_INDICES_IN_SHARD + 5) as u64).collect();
|
||||
|
||||
assert_eq!(
|
||||
table.get(&shard((NUM_OF_INDICES_IN_SHARD - 1) as u64)),
|
||||
Some(&first_shard_blocks),
|
||||
"First shard should have highest_block = last entry"
|
||||
);
|
||||
assert_eq!(
|
||||
table.get(&shard(u64::MAX)),
|
||||
Some(&new_shard_blocks),
|
||||
"New final shard should have u64::MAX key"
|
||||
);
|
||||
|
||||
unwind(&db, last_block, (NUM_OF_INDICES_IN_SHARD - 1) as u64);
|
||||
|
||||
let table = cast(db.table::<tables::StoragesHistory>().unwrap());
|
||||
assert_eq!(
|
||||
table,
|
||||
BTreeMap::from([(shard(u64::MAX), first_shard_blocks)]),
|
||||
"After unwind, should revert to single shard with u64::MAX"
|
||||
);
|
||||
}
|
||||
|
||||
stage_test_suite_ext!(IndexStorageHistoryTestRunner, index_storage_history);
|
||||
|
||||
struct IndexStorageHistoryTestRunner {
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
//! Utils for `stages`.
|
||||
use alloy_primitives::{Address, BlockNumber, TxNumber, B256};
|
||||
use alloy_primitives::{Address, BlockNumber, TxNumber};
|
||||
use reth_config::config::EtlConfig;
|
||||
use reth_db_api::{
|
||||
cursor::{DbCursorRO, DbCursorRW},
|
||||
models::{
|
||||
sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey,
|
||||
AccountBeforeTx, AddressStorageKey, BlockNumberAddress, ShardedKey,
|
||||
AccountBeforeTx, AddressStorageKey, BlockNumberAddress, ShardedHistoryKey, ShardedKey,
|
||||
},
|
||||
table::{Decode, Decompress, Table},
|
||||
table::{Decode, Decompress, Key, Table},
|
||||
transaction::DbTx,
|
||||
BlockNumberList,
|
||||
};
|
||||
@@ -15,7 +15,7 @@ use reth_etl::Collector;
|
||||
use reth_primitives_traits::NodePrimitives;
|
||||
use reth_provider::{
|
||||
providers::StaticFileProvider, to_range, BlockReader, DBProvider, EitherWriter, ProviderError,
|
||||
StaticFileProviderFactory,
|
||||
ProviderResult, StaticFileProviderFactory,
|
||||
};
|
||||
use reth_stages_api::StageError;
|
||||
use reth_static_file_types::StaticFileSegment;
|
||||
@@ -23,6 +23,169 @@ use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
|
||||
use std::{collections::HashMap, hash::Hash, ops::RangeBounds};
|
||||
use tracing::info;
|
||||
|
||||
/// Trait for writing sharded history indices to the database.
|
||||
pub(crate) trait HistoryShardWriter {
|
||||
/// The full sharded key type for the table.
|
||||
type TableKey: Key + ShardedHistoryKey;
|
||||
|
||||
/// Gets the last shard for a prefix (for incremental sync merging).
|
||||
fn get_last_shard(
|
||||
&mut self,
|
||||
prefix: <Self::TableKey as ShardedHistoryKey>::Prefix,
|
||||
) -> ProviderResult<Option<BlockNumberList>>;
|
||||
|
||||
/// Writes a shard to the database (append or upsert based on flag).
|
||||
fn write_shard(
|
||||
&mut self,
|
||||
key: Self::TableKey,
|
||||
value: &BlockNumberList,
|
||||
append: bool,
|
||||
) -> ProviderResult<()>;
|
||||
}
|
||||
|
||||
/// Loads sharded history indices from a collector into the database.
|
||||
///
|
||||
/// ## Why sharding?
|
||||
/// History indices track "which blocks modified this address/storage slot". A popular contract
|
||||
/// may have millions of changes, too large for a single DB value. We split into shards of
|
||||
/// `NUM_OF_INDICES_IN_SHARD` (2000) block numbers each.
|
||||
///
|
||||
/// ## Key structure
|
||||
/// Each shard is keyed by `(prefix, highest_block_in_shard)`. Example for an address:
|
||||
/// - `(0xABC..., 5000)` → blocks 3001-5000
|
||||
/// - `(0xABC..., u64::MAX)` → blocks 5001-6234 (final shard)
|
||||
///
|
||||
/// The `u64::MAX` sentinel on the last shard enables `seek_exact(prefix, u64::MAX)` to find
|
||||
/// it for incremental sync merging.
|
||||
///
|
||||
/// When `append_only=true`, collector must yield keys in ascending order (MDBX requirement).
|
||||
fn load_sharded_history<H: HistoryShardWriter>(
|
||||
collector: &mut Collector<H::TableKey, BlockNumberList>,
|
||||
append_only: bool,
|
||||
writer: &mut H,
|
||||
) -> Result<(), StageError> {
|
||||
type Prefix<H> = <<H as HistoryShardWriter>::TableKey as ShardedHistoryKey>::Prefix;
|
||||
|
||||
// Option needed to distinguish "no prefix yet" from "processing Address::ZERO"
|
||||
let mut current_prefix: Option<Prefix<H>> = None;
|
||||
// Buffer for block numbers; sized for ~2 shards to minimize reallocations
|
||||
let mut current_list = Vec::<u64>::with_capacity(NUM_OF_INDICES_IN_SHARD * 2);
|
||||
|
||||
// Progress reporting setup
|
||||
let total_entries = collector.len();
|
||||
let interval = (total_entries / 10).max(1);
|
||||
|
||||
for (index, element) in collector.iter()?.enumerate() {
|
||||
let (k, v) = element?;
|
||||
let sharded_key = H::TableKey::decode_owned(k)?;
|
||||
let new_list = BlockNumberList::decompress_owned(v)?;
|
||||
|
||||
if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
|
||||
info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
|
||||
}
|
||||
|
||||
let prefix = sharded_key.prefix();
|
||||
|
||||
// When prefix changes, flush previous prefix's shards and start fresh
|
||||
if current_prefix != Some(prefix) {
|
||||
// Flush remaining shards for the previous prefix (uses u64::MAX for final shard)
|
||||
if let Some(prev_prefix) = current_prefix {
|
||||
flush_shards::<H>(prev_prefix, &mut current_list, append_only, writer)?;
|
||||
}
|
||||
|
||||
current_prefix = Some(prefix);
|
||||
current_list.clear();
|
||||
|
||||
// On incremental sync, merge with existing last shard (stored with u64::MAX key)
|
||||
if !append_only && let Some(last_shard) = writer.get_last_shard(prefix)? {
|
||||
current_list.extend(last_shard.iter());
|
||||
}
|
||||
}
|
||||
|
||||
// Accumulate new block numbers
|
||||
current_list.extend(new_list.iter());
|
||||
// Flush complete shards while keeping one buffered for continued accumulation
|
||||
flush_shards_partial::<H>(prefix, &mut current_list, append_only, writer)?;
|
||||
}
|
||||
|
||||
// Flush final prefix's remaining shard
|
||||
if let Some(prefix) = current_prefix {
|
||||
flush_shards::<H>(prefix, &mut current_list, append_only, writer)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flushes complete shards, keeping at least one shard buffered for continued accumulation.
|
||||
///
|
||||
/// We buffer one shard because `flush_shards` uses `u64::MAX` as the final shard's key.
|
||||
/// If we flushed everything here, we'd write `u64::MAX` keys that get overwritten later.
|
||||
fn flush_shards_partial<H: HistoryShardWriter>(
|
||||
prefix: <H::TableKey as ShardedHistoryKey>::Prefix,
|
||||
list: &mut Vec<u64>,
|
||||
append_only: bool,
|
||||
writer: &mut H,
|
||||
) -> Result<(), StageError> {
|
||||
// Not enough to fill a shard yet
|
||||
if list.len() <= NUM_OF_INDICES_IN_SHARD {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let num_full_shards = list.len() / NUM_OF_INDICES_IN_SHARD;
|
||||
// Keep one shard buffered: if exact multiple, keep last full shard for u64::MAX key later
|
||||
let shards_to_flush = if list.len().is_multiple_of(NUM_OF_INDICES_IN_SHARD) {
|
||||
num_full_shards - 1
|
||||
} else {
|
||||
num_full_shards
|
||||
};
|
||||
|
||||
if shards_to_flush == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let flush_len = shards_to_flush * NUM_OF_INDICES_IN_SHARD;
|
||||
debug_assert!(flush_len <= list.len(), "flush_len exceeds list length");
|
||||
|
||||
// Write complete shards with their actual highest block number as key
|
||||
for chunk in list[..flush_len].chunks(NUM_OF_INDICES_IN_SHARD) {
|
||||
let highest = *chunk.last().expect("chunk is non-empty");
|
||||
let key = H::TableKey::new_sharded(prefix, highest);
|
||||
let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
|
||||
writer.write_shard(key, &value, append_only)?;
|
||||
}
|
||||
|
||||
// Shift remaining elements to front (avoids allocation vs split_off)
|
||||
list.copy_within(flush_len.., 0);
|
||||
list.truncate(list.len() - flush_len);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flushes all remaining shards. Uses `u64::MAX` for the final shard's key to enable
|
||||
/// incremental sync lookups via `seek_exact(prefix, u64::MAX)`.
|
||||
fn flush_shards<H: HistoryShardWriter>(
|
||||
prefix: <H::TableKey as ShardedHistoryKey>::Prefix,
|
||||
list: &mut Vec<u64>,
|
||||
append_only: bool,
|
||||
writer: &mut H,
|
||||
) -> Result<(), StageError> {
|
||||
if list.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let num_chunks = list.len().div_ceil(NUM_OF_INDICES_IN_SHARD);
|
||||
|
||||
for (i, chunk) in list.chunks(NUM_OF_INDICES_IN_SHARD).enumerate() {
|
||||
let is_last = i == num_chunks - 1;
|
||||
let highest = if is_last { u64::MAX } else { *chunk.last().expect("chunk is non-empty") };
|
||||
let key = H::TableKey::new_sharded(prefix, highest);
|
||||
let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
|
||||
writer.write_shard(key, &value, append_only)?;
|
||||
}
|
||||
|
||||
list.clear();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Number of blocks before pushing indices from cache to [`Collector`]
|
||||
const DEFAULT_CACHE_THRESHOLD: u64 = 100_000;
|
||||
|
||||
@@ -230,17 +393,40 @@ where
|
||||
Ok(collector)
|
||||
}
|
||||
|
||||
/// Loads account history indices into the database via `EitherWriter`.
|
||||
///
|
||||
/// Works with [`EitherWriter`] to support both MDBX and `RocksDB` backends.
|
||||
///
|
||||
/// ## Process
|
||||
/// Iterates over elements, grouping indices by their address. It flushes indices to disk
|
||||
/// when reaching a shard's max length (`NUM_OF_INDICES_IN_SHARD`) or when the address changes,
|
||||
/// ensuring the last previous address shard is stored.
|
||||
///
|
||||
/// Uses `Option<Address>` instead of `Address::default()` as the sentinel to avoid
|
||||
/// incorrectly treating `Address::ZERO` as "no previous address".
|
||||
/// Adapter for writing account history shards via `EitherWriter`.
|
||||
struct AccountHistoryShardWriter<'a, 'tx, CURSOR, N> {
|
||||
writer: &'a mut EitherWriter<'tx, CURSOR, N>,
|
||||
}
|
||||
|
||||
impl<CURSOR, N: NodePrimitives> HistoryShardWriter for AccountHistoryShardWriter<'_, '_, CURSOR, N>
|
||||
where
|
||||
CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
|
||||
+ DbCursorRO<reth_db_api::tables::AccountsHistory>,
|
||||
{
|
||||
type TableKey = ShardedKey<Address>;
|
||||
|
||||
fn get_last_shard(
|
||||
&mut self,
|
||||
prefix: <Self::TableKey as ShardedHistoryKey>::Prefix,
|
||||
) -> ProviderResult<Option<BlockNumberList>> {
|
||||
self.writer.get_last_account_history_shard(prefix)
|
||||
}
|
||||
|
||||
fn write_shard(
|
||||
&mut self,
|
||||
key: Self::TableKey,
|
||||
value: &BlockNumberList,
|
||||
append: bool,
|
||||
) -> ProviderResult<()> {
|
||||
if append {
|
||||
self.writer.append_account_history(key, value)
|
||||
} else {
|
||||
self.writer.upsert_account_history(key, value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Loads account history indices from the collector into the database.
|
||||
pub(crate) fn load_account_history<N, CURSOR>(
|
||||
mut collector: Collector<ShardedKey<Address>, BlockNumberList>,
|
||||
append_only: bool,
|
||||
@@ -251,155 +437,8 @@ where
|
||||
CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
|
||||
+ DbCursorRO<reth_db_api::tables::AccountsHistory>,
|
||||
{
|
||||
let mut current_address: Option<Address> = None;
|
||||
// Accumulator for block numbers where the current address changed.
|
||||
let mut current_list = Vec::<u64>::new();
|
||||
|
||||
let total_entries = collector.len();
|
||||
let interval = (total_entries / 10).max(1);
|
||||
|
||||
for (index, element) in collector.iter()?.enumerate() {
|
||||
let (k, v) = element?;
|
||||
let sharded_key = ShardedKey::<Address>::decode_owned(k)?;
|
||||
let new_list = BlockNumberList::decompress_owned(v)?;
|
||||
|
||||
if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
|
||||
info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
|
||||
}
|
||||
|
||||
let address = sharded_key.key;
|
||||
|
||||
// When address changes, flush the previous address's shards and start fresh.
|
||||
if current_address != Some(address) {
|
||||
// Flush all remaining shards for the previous address (uses u64::MAX for last shard).
|
||||
if let Some(prev_addr) = current_address {
|
||||
flush_account_history_shards(prev_addr, &mut current_list, append_only, writer)?;
|
||||
}
|
||||
|
||||
current_address = Some(address);
|
||||
current_list.clear();
|
||||
|
||||
// On incremental sync, merge with the existing last shard from the database.
|
||||
// The last shard is stored with key (address, u64::MAX) so we can find it.
|
||||
if !append_only &&
|
||||
let Some(last_shard) = writer.get_last_account_history_shard(address)?
|
||||
{
|
||||
current_list.extend(last_shard.iter());
|
||||
}
|
||||
}
|
||||
|
||||
// Append new block numbers to the accumulator.
|
||||
current_list.extend(new_list.iter());
|
||||
|
||||
// Flush complete shards, keeping the last (partial) shard buffered.
|
||||
flush_account_history_shards_partial(address, &mut current_list, append_only, writer)?;
|
||||
}
|
||||
|
||||
// Flush the final address's remaining shard.
|
||||
if let Some(addr) = current_address {
|
||||
flush_account_history_shards(addr, &mut current_list, append_only, writer)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flushes complete shards for account history, keeping the trailing partial shard buffered.
|
||||
///
|
||||
/// Only flushes when we have more than one shard's worth of data, keeping the last
|
||||
/// (possibly partial) shard for continued accumulation. This avoids writing a shard
|
||||
/// that may need to be updated when more indices arrive.
|
||||
fn flush_account_history_shards_partial<N, CURSOR>(
|
||||
address: Address,
|
||||
list: &mut Vec<u64>,
|
||||
append_only: bool,
|
||||
writer: &mut EitherWriter<'_, CURSOR, N>,
|
||||
) -> Result<(), StageError>
|
||||
where
|
||||
N: NodePrimitives,
|
||||
CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
|
||||
+ DbCursorRO<reth_db_api::tables::AccountsHistory>,
|
||||
{
|
||||
// Nothing to flush if we haven't filled a complete shard yet.
|
||||
if list.len() <= NUM_OF_INDICES_IN_SHARD {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let num_full_shards = list.len() / NUM_OF_INDICES_IN_SHARD;
|
||||
|
||||
// Always keep at least one shard buffered for continued accumulation.
|
||||
// If len is exact multiple of shard size, keep the last full shard.
|
||||
let shards_to_flush = if list.len().is_multiple_of(NUM_OF_INDICES_IN_SHARD) {
|
||||
num_full_shards - 1
|
||||
} else {
|
||||
num_full_shards
|
||||
};
|
||||
|
||||
if shards_to_flush == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Split: flush the first N shards, keep the remainder buffered.
|
||||
let flush_len = shards_to_flush * NUM_OF_INDICES_IN_SHARD;
|
||||
let remainder = list.split_off(flush_len);
|
||||
|
||||
// Write each complete shard with its highest block number as the key.
|
||||
for chunk in list.chunks(NUM_OF_INDICES_IN_SHARD) {
|
||||
let highest = *chunk.last().expect("chunk is non-empty");
|
||||
let key = ShardedKey::new(address, highest);
|
||||
let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
|
||||
|
||||
if append_only {
|
||||
writer.append_account_history(key, &value)?;
|
||||
} else {
|
||||
writer.upsert_account_history(key, &value)?;
|
||||
}
|
||||
}
|
||||
|
||||
// Keep the remaining indices for the next iteration.
|
||||
*list = remainder;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flushes all remaining shards for account history, using `u64::MAX` for the last shard.
|
||||
///
|
||||
/// The `u64::MAX` key for the final shard is an invariant that allows `seek_exact(address,
|
||||
/// u64::MAX)` to find the last shard during incremental sync for merging with new indices.
|
||||
fn flush_account_history_shards<N, CURSOR>(
|
||||
address: Address,
|
||||
list: &mut Vec<u64>,
|
||||
append_only: bool,
|
||||
writer: &mut EitherWriter<'_, CURSOR, N>,
|
||||
) -> Result<(), StageError>
|
||||
where
|
||||
N: NodePrimitives,
|
||||
CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
|
||||
+ DbCursorRO<reth_db_api::tables::AccountsHistory>,
|
||||
{
|
||||
if list.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let num_chunks = list.len().div_ceil(NUM_OF_INDICES_IN_SHARD);
|
||||
|
||||
for (i, chunk) in list.chunks(NUM_OF_INDICES_IN_SHARD).enumerate() {
|
||||
let is_last = i == num_chunks - 1;
|
||||
|
||||
// Use u64::MAX for the final shard's key. This invariant allows incremental sync
|
||||
// to find the last shard via seek_exact(address, u64::MAX) for merging.
|
||||
let highest = if is_last { u64::MAX } else { *chunk.last().expect("chunk is non-empty") };
|
||||
|
||||
let key = ShardedKey::new(address, highest);
|
||||
let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
|
||||
|
||||
if append_only {
|
||||
writer.append_account_history(key, &value)?;
|
||||
} else {
|
||||
writer.upsert_account_history(key, &value)?;
|
||||
}
|
||||
}
|
||||
|
||||
list.clear();
|
||||
Ok(())
|
||||
let mut adapter = AccountHistoryShardWriter { writer };
|
||||
load_sharded_history(&mut collector, append_only, &mut adapter)
|
||||
}
|
||||
|
||||
/// Called when database is ahead of static files. Attempts to find the first block we are missing
|
||||
@@ -438,17 +477,40 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
/// Loads storage history indices into the database via `EitherWriter`.
|
||||
///
|
||||
/// Works with [`EitherWriter`] to support both MDBX and `RocksDB` backends.
|
||||
///
|
||||
/// ## Process
|
||||
/// Iterates over elements, grouping indices by their (address, `storage_key`) pairs. It flushes
|
||||
/// indices to disk when reaching a shard's max length (`NUM_OF_INDICES_IN_SHARD`) or when the
|
||||
/// (address, `storage_key`) pair changes, ensuring the last previous shard is stored.
|
||||
///
|
||||
/// Uses `Option<(Address, B256)>` instead of default values as the sentinel to avoid
|
||||
/// incorrectly treating `(Address::ZERO, B256::ZERO)` as "no previous key".
|
||||
/// Adapter for writing storage history shards via `EitherWriter`.
|
||||
struct StorageHistoryShardWriter<'a, 'tx, CURSOR, N> {
|
||||
writer: &'a mut EitherWriter<'tx, CURSOR, N>,
|
||||
}
|
||||
|
||||
impl<CURSOR, N: NodePrimitives> HistoryShardWriter for StorageHistoryShardWriter<'_, '_, CURSOR, N>
|
||||
where
|
||||
CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
|
||||
+ DbCursorRO<reth_db_api::tables::StoragesHistory>,
|
||||
{
|
||||
type TableKey = StorageShardedKey;
|
||||
|
||||
fn get_last_shard(
|
||||
&mut self,
|
||||
prefix: <Self::TableKey as ShardedHistoryKey>::Prefix,
|
||||
) -> ProviderResult<Option<BlockNumberList>> {
|
||||
self.writer.get_last_storage_history_shard(prefix.0, prefix.1)
|
||||
}
|
||||
|
||||
fn write_shard(
|
||||
&mut self,
|
||||
key: Self::TableKey,
|
||||
value: &BlockNumberList,
|
||||
append: bool,
|
||||
) -> ProviderResult<()> {
|
||||
if append {
|
||||
self.writer.append_storage_history(key, value)
|
||||
} else {
|
||||
self.writer.upsert_storage_history(key, value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Loads storage history indices from the collector into the database.
|
||||
pub(crate) fn load_storage_history<N, CURSOR>(
|
||||
mut collector: Collector<StorageShardedKey, BlockNumberList>,
|
||||
append_only: bool,
|
||||
@@ -459,169 +521,6 @@ where
|
||||
CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
|
||||
+ DbCursorRO<reth_db_api::tables::StoragesHistory>,
|
||||
{
|
||||
let mut current_key: Option<(Address, B256)> = None;
|
||||
// Accumulator for block numbers where the current (address, storage_key) changed.
|
||||
let mut current_list = Vec::<u64>::new();
|
||||
|
||||
let total_entries = collector.len();
|
||||
let interval = (total_entries / 10).max(1);
|
||||
|
||||
for (index, element) in collector.iter()?.enumerate() {
|
||||
let (k, v) = element?;
|
||||
let sharded_key = StorageShardedKey::decode_owned(k)?;
|
||||
let new_list = BlockNumberList::decompress_owned(v)?;
|
||||
|
||||
if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
|
||||
info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
|
||||
}
|
||||
|
||||
let partial_key = (sharded_key.address, sharded_key.sharded_key.key);
|
||||
|
||||
// When (address, storage_key) changes, flush the previous key's shards and start fresh.
|
||||
if current_key != Some(partial_key) {
|
||||
// Flush all remaining shards for the previous key (uses u64::MAX for last shard).
|
||||
if let Some((prev_addr, prev_storage_key)) = current_key {
|
||||
flush_storage_history_shards(
|
||||
prev_addr,
|
||||
prev_storage_key,
|
||||
&mut current_list,
|
||||
append_only,
|
||||
writer,
|
||||
)?;
|
||||
}
|
||||
|
||||
current_key = Some(partial_key);
|
||||
current_list.clear();
|
||||
|
||||
// On incremental sync, merge with the existing last shard from the database.
|
||||
// The last shard is stored with key (address, storage_key, u64::MAX) so we can find it.
|
||||
if !append_only &&
|
||||
let Some(last_shard) =
|
||||
writer.get_last_storage_history_shard(partial_key.0, partial_key.1)?
|
||||
{
|
||||
current_list.extend(last_shard.iter());
|
||||
}
|
||||
}
|
||||
|
||||
// Append new block numbers to the accumulator.
|
||||
current_list.extend(new_list.iter());
|
||||
|
||||
// Flush complete shards, keeping the last (partial) shard buffered.
|
||||
flush_storage_history_shards_partial(
|
||||
partial_key.0,
|
||||
partial_key.1,
|
||||
&mut current_list,
|
||||
append_only,
|
||||
writer,
|
||||
)?;
|
||||
}
|
||||
|
||||
// Flush the final key's remaining shard.
|
||||
if let Some((addr, storage_key)) = current_key {
|
||||
flush_storage_history_shards(addr, storage_key, &mut current_list, append_only, writer)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flushes complete shards for storage history, keeping the trailing partial shard buffered.
|
||||
///
|
||||
/// Only flushes when we have more than one shard's worth of data, keeping the last
|
||||
/// (possibly partial) shard for continued accumulation. This avoids writing a shard
|
||||
/// that may need to be updated when more indices arrive.
|
||||
fn flush_storage_history_shards_partial<N, CURSOR>(
|
||||
address: Address,
|
||||
storage_key: B256,
|
||||
list: &mut Vec<u64>,
|
||||
append_only: bool,
|
||||
writer: &mut EitherWriter<'_, CURSOR, N>,
|
||||
) -> Result<(), StageError>
|
||||
where
|
||||
N: NodePrimitives,
|
||||
CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
|
||||
+ DbCursorRO<reth_db_api::tables::StoragesHistory>,
|
||||
{
|
||||
// Nothing to flush if we haven't filled a complete shard yet.
|
||||
if list.len() <= NUM_OF_INDICES_IN_SHARD {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let num_full_shards = list.len() / NUM_OF_INDICES_IN_SHARD;
|
||||
|
||||
// Always keep at least one shard buffered for continued accumulation.
|
||||
// If len is exact multiple of shard size, keep the last full shard.
|
||||
let shards_to_flush = if list.len().is_multiple_of(NUM_OF_INDICES_IN_SHARD) {
|
||||
num_full_shards - 1
|
||||
} else {
|
||||
num_full_shards
|
||||
};
|
||||
|
||||
if shards_to_flush == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Split: flush the first N shards, keep the remainder buffered.
|
||||
let flush_len = shards_to_flush * NUM_OF_INDICES_IN_SHARD;
|
||||
let remainder = list.split_off(flush_len);
|
||||
|
||||
// Write each complete shard with its highest block number as the key.
|
||||
for chunk in list.chunks(NUM_OF_INDICES_IN_SHARD) {
|
||||
let highest = *chunk.last().expect("chunk is non-empty");
|
||||
let key = StorageShardedKey::new(address, storage_key, highest);
|
||||
let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
|
||||
|
||||
if append_only {
|
||||
writer.append_storage_history(key, &value)?;
|
||||
} else {
|
||||
writer.upsert_storage_history(key, &value)?;
|
||||
}
|
||||
}
|
||||
|
||||
// Keep the remaining indices for the next iteration.
|
||||
*list = remainder;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flushes all remaining shards for storage history, using `u64::MAX` for the last shard.
|
||||
///
|
||||
/// The `u64::MAX` key for the final shard is an invariant that allows
|
||||
/// `seek_exact(address, storage_key, u64::MAX)` to find the last shard during incremental
|
||||
/// sync for merging with new indices.
|
||||
fn flush_storage_history_shards<N, CURSOR>(
|
||||
address: Address,
|
||||
storage_key: B256,
|
||||
list: &mut Vec<u64>,
|
||||
append_only: bool,
|
||||
writer: &mut EitherWriter<'_, CURSOR, N>,
|
||||
) -> Result<(), StageError>
|
||||
where
|
||||
N: NodePrimitives,
|
||||
CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
|
||||
+ DbCursorRO<reth_db_api::tables::StoragesHistory>,
|
||||
{
|
||||
if list.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let num_chunks = list.len().div_ceil(NUM_OF_INDICES_IN_SHARD);
|
||||
|
||||
for (i, chunk) in list.chunks(NUM_OF_INDICES_IN_SHARD).enumerate() {
|
||||
let is_last = i == num_chunks - 1;
|
||||
|
||||
// Use u64::MAX for the final shard's key. This invariant allows incremental sync
|
||||
// to find the last shard via seek_exact(address, storage_key, u64::MAX) for merging.
|
||||
let highest = if is_last { u64::MAX } else { *chunk.last().expect("chunk is non-empty") };
|
||||
|
||||
let key = StorageShardedKey::new(address, storage_key, highest);
|
||||
let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
|
||||
|
||||
if append_only {
|
||||
writer.append_storage_history(key, &value)?;
|
||||
} else {
|
||||
writer.upsert_storage_history(key, &value)?;
|
||||
}
|
||||
}
|
||||
|
||||
list.clear();
|
||||
Ok(())
|
||||
let mut adapter = StorageHistoryShardWriter { writer };
|
||||
load_sharded_history(&mut collector, append_only, &mut adapter)
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ pub use reth_db_models::{
|
||||
AccountBeforeTx, ClientVersion, StaticFileBlockWithdrawals, StorageBeforeTx,
|
||||
StoredBlockBodyIndices, StoredBlockWithdrawals,
|
||||
};
|
||||
pub use sharded_key::ShardedKey;
|
||||
pub use sharded_key::{ShardedHistoryKey, ShardedKey};
|
||||
|
||||
/// Macro that implements [`Encode`] and [`Decode`] for uint types.
|
||||
macro_rules! impl_uints {
|
||||
|
||||
@@ -10,6 +10,21 @@ use std::hash::Hash;
|
||||
/// Number of indices in one shard.
|
||||
pub const NUM_OF_INDICES_IN_SHARD: usize = 2_000;
|
||||
|
||||
/// Trait for sharded history keys that can be constructed from a prefix + block number.
|
||||
///
|
||||
/// This abstracts over the key construction and prefix extraction for sharded history tables
|
||||
/// like `AccountsHistory` and `StoragesHistory`.
|
||||
pub trait ShardedHistoryKey: Sized {
|
||||
/// The prefix type (e.g., `Address` for accounts, `(Address, B256)` for storage).
|
||||
type Prefix: Copy + Eq;
|
||||
|
||||
/// Creates a new sharded key from prefix and highest block number.
|
||||
fn new_sharded(prefix: Self::Prefix, highest_block_number: u64) -> Self;
|
||||
|
||||
/// Extracts the prefix from this key.
|
||||
fn prefix(&self) -> Self::Prefix;
|
||||
}
|
||||
|
||||
/// Size of `BlockNumber` in bytes (u64 = 8 bytes).
|
||||
const BLOCK_NUMBER_SIZE: usize = std::mem::size_of::<BlockNumber>();
|
||||
|
||||
@@ -77,6 +92,20 @@ impl Decode for ShardedKey<Address> {
|
||||
}
|
||||
}
|
||||
|
||||
impl ShardedHistoryKey for ShardedKey<Address> {
|
||||
type Prefix = Address;
|
||||
|
||||
#[inline]
|
||||
fn new_sharded(prefix: Self::Prefix, highest_block_number: u64) -> Self {
|
||||
Self::new(prefix, highest_block_number)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn prefix(&self) -> Self::Prefix {
|
||||
self.key
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -110,4 +139,14 @@ mod tests {
|
||||
let decoded = ShardedKey::<Address>::decode(&encoded).unwrap();
|
||||
assert_eq!(decoded.highest_block_number, u64::MAX);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sharded_history_key_trait_roundtrip() {
|
||||
let addr = address!("0102030405060708091011121314151617181920");
|
||||
let block_num = 0x123456789ABCDEFu64;
|
||||
|
||||
let key = ShardedKey::<Address>::new_sharded(addr, block_num);
|
||||
assert_eq!(key.prefix(), addr);
|
||||
assert_eq!(key.highest_block_number, block_num);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ use alloy_primitives::{Address, BlockNumber, B256};
|
||||
use derive_more::AsRef;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::ShardedKey;
|
||||
use super::{ShardedHistoryKey, ShardedKey};
|
||||
|
||||
/// Number of indices in one shard.
|
||||
pub const NUM_OF_INDICES_IN_SHARD: usize = 2_000;
|
||||
@@ -91,6 +91,20 @@ impl Decode for StorageShardedKey {
|
||||
}
|
||||
}
|
||||
|
||||
impl ShardedHistoryKey for StorageShardedKey {
|
||||
type Prefix = (Address, B256);
|
||||
|
||||
#[inline]
|
||||
fn new_sharded(prefix: Self::Prefix, highest_block_number: u64) -> Self {
|
||||
Self::new(prefix.0, prefix.1, highest_block_number)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn prefix(&self) -> Self::Prefix {
|
||||
(self.address, self.sharded_key.key)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -130,4 +144,15 @@ mod tests {
|
||||
let decoded = StorageShardedKey::decode(&encoded).unwrap();
|
||||
assert_eq!(decoded.sharded_key.highest_block_number, u64::MAX);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sharded_history_key_trait_roundtrip() {
|
||||
let addr = address!("0102030405060708091011121314151617181920");
|
||||
let storage_key = b256!("0001020304050607080910111213141516171819202122232425262728293031");
|
||||
let block_num = 0x123456789ABCDEFu64;
|
||||
|
||||
let key = StorageShardedKey::new_sharded((addr, storage_key), block_num);
|
||||
assert_eq!(key.prefix(), (addr, storage_key));
|
||||
assert_eq!(key.sharded_key.highest_block_number, block_num);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user